Skip to content

Commit

Permalink
Allow different services to use the same host-mode port
Browse files Browse the repository at this point in the history
Relax the validation in controlapi to allow two services to use the same
host-mode published port.

Add a scheduler filter so that tasks from these services land on
different nodes.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
  • Loading branch information
aaronlehmann committed May 10, 2017
1 parent 8711c5a commit 4303643
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 0 deletions.
7 changes: 7 additions & 0 deletions manager/controlapi/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,13 @@ func (s *Server) checkPortConflicts(spec *api.ServiceSpec, serviceID string) err
}
if service.Spec.Endpoint != nil {
for _, pc := range service.Spec.Endpoint.Ports {
// If the publish mode is host, it is better to leave the collision check
// to the scheduler, given multiple services with same port in host publish
// mode can coexist.
if pc.PublishMode == api.PublishModeHost {
continue
}

if reqPorts[pcToString(pc)] {
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service '%s' (%s)", pc.PublishedPort, service.Spec.Annotations.Name, service.ID)
}
Expand Down
17 changes: 17 additions & 0 deletions manager/controlapi/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,23 @@ func TestCreateService(t *testing.T) {
}}
_, err = ts.Client.CreateService(context.Background(), &api.CreateServiceRequest{Spec: spec2})
assert.NoError(t, err)

// ensure no port conflict when host ports are used
spec = createSpec("name8", "image", 1)
spec.Endpoint = &api.EndpointSpec{Ports: []*api.PortConfig{
{PublishMode: api.PublishModeHost, PublishedPort: uint32(9101), TargetPort: uint32(9101), Protocol: api.PortConfig_Protocol(api.ProtocolTCP)},
}}
r, err = ts.Client.CreateService(context.Background(), &api.CreateServiceRequest{Spec: spec})
assert.NoError(t, err)
assert.NotEmpty(t, r.Service.ID)

spec2 = createSpec("name9", "image", 1)
spec2.Endpoint = &api.EndpointSpec{Ports: []*api.PortConfig{
{PublishMode: api.PublishModeHost, PublishedPort: uint32(9101), TargetPort: uint32(9101), Protocol: api.PortConfig_Protocol(api.ProtocolTCP)},
}}
_, err = ts.Client.CreateService(context.Background(), &api.CreateServiceRequest{Spec: spec2})
assert.NoError(t, err)

}

func TestSecretValidation(t *testing.T) {
Expand Down
41 changes: 41 additions & 0 deletions manager/scheduler/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,44 @@ func (f *PlatformFilter) Explain(nodes int) string {
}
return fmt.Sprintf("unsupported platform on %d nodes", nodes)
}

// HostPortFilter checks that the node has a specific port available.
type HostPortFilter struct {
t *api.Task
}

// SetTask returns true when the filter is enabled for a given task.
func (f *HostPortFilter) SetTask(t *api.Task) bool {
if t.Endpoint != nil {
for _, port := range t.Endpoint.Ports {
if port.PublishMode == api.PublishModeHost && port.PublishedPort != 0 {
f.t = t
return true
}
}
}

return false
}

// Check returns true if the task can be scheduled into the given node.
func (f *HostPortFilter) Check(n *NodeInfo) bool {
for _, port := range f.t.Endpoint.Ports {
if port.PublishMode == api.PublishModeHost && port.PublishedPort != 0 {
portSpec := hostPortSpec{protocol: port.Protocol, publishedPort: port.PublishedPort}
if _, ok := n.usedHostPorts[portSpec]; ok {
return false
}
}
}

return true
}

// Explain returns an explanation of a failure.
func (f *HostPortFilter) Explain(nodes int) string {
if nodes == 1 {
return "host-mode port already in use on 1 node"
}
return fmt.Sprintf("host-mode port already in use on %d nodes", nodes)
}
26 changes: 26 additions & 0 deletions manager/scheduler/nodeinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@ import (
"golang.org/x/net/context"
)

// hostPortSpec specifies a used host port.
type hostPortSpec struct {
protocol api.PortConfig_Protocol
publishedPort uint32
}

// NodeInfo contains a node and some additional metadata.
type NodeInfo struct {
*api.Node
Tasks map[string]*api.Task
ActiveTasksCount int
ActiveTasksCountByService map[string]int
AvailableResources api.Resources
usedHostPorts map[hostPortSpec]struct{}

// recentFailures is a map from service ID to the timestamps of the
// most recent failures the node has experienced from replicas of that
Expand All @@ -30,6 +37,7 @@ func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api
Tasks: make(map[string]*api.Task),
ActiveTasksCountByService: make(map[string]int),
AvailableResources: availableResources,
usedHostPorts: make(map[hostPortSpec]struct{}),
recentFailures: make(map[string][]time.Time),
}

Expand Down Expand Up @@ -57,6 +65,15 @@ func (nodeInfo *NodeInfo) removeTask(t *api.Task) bool {
nodeInfo.AvailableResources.MemoryBytes += reservations.MemoryBytes
nodeInfo.AvailableResources.NanoCPUs += reservations.NanoCPUs

if t.Endpoint != nil {
for _, port := range t.Endpoint.Ports {
if port.PublishMode == api.PublishModeHost && port.PublishedPort != 0 {
portSpec := hostPortSpec{protocol: port.Protocol, publishedPort: port.PublishedPort}
delete(nodeInfo.usedHostPorts, portSpec)
}
}
}

return true
}

Expand Down Expand Up @@ -84,6 +101,15 @@ func (nodeInfo *NodeInfo) addTask(t *api.Task) bool {
nodeInfo.AvailableResources.MemoryBytes -= reservations.MemoryBytes
nodeInfo.AvailableResources.NanoCPUs -= reservations.NanoCPUs

if t.Endpoint != nil {
for _, port := range t.Endpoint.Ports {
if port.PublishMode == api.PublishModeHost && port.PublishedPort != 0 {
portSpec := hostPortSpec{protocol: port.Protocol, publishedPort: port.PublishedPort}
nodeInfo.usedHostPorts[portSpec] = struct{}{}
}
}
}

if t.DesiredState <= api.TaskStateRunning {
nodeInfo.ActiveTasksCount++
nodeInfo.ActiveTasksCountByService[t.ServiceID]++
Expand Down
1 change: 1 addition & 0 deletions manager/scheduler/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var (
&PluginFilter{},
&ConstraintFilter{},
&PlatformFilter{},
&HostPortFilter{},
}
)

Expand Down
143 changes: 143 additions & 0 deletions manager/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2438,3 +2438,146 @@ func benchScheduler(b *testing.B, nodes, tasks int, networkConstraints bool) {
s.Close()
}
}

func TestSchedulerHostPort(t *testing.T) {
ctx := context.Background()
node1 := &api.Node{
ID: "nodeid1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
node2 := &api.Node{
ID: "nodeid2",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node2",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}

task1 := &api.Task{
ID: "id1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
},
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Endpoint: &api.Endpoint{
Ports: []*api.PortConfig{
{
PublishMode: api.PublishModeHost,
PublishedPort: 58,
Protocol: api.ProtocolTCP,
},
},
},
}
task2 := &api.Task{
ID: "id2",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
},
ServiceAnnotations: api.Annotations{
Name: "name2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Endpoint: &api.Endpoint{
Ports: []*api.PortConfig{
{
PublishMode: api.PublishModeHost,
PublishedPort: 58,
Protocol: api.ProtocolUDP,
},
},
},
}
task3 := &api.Task{
ID: "id3",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
},
ServiceAnnotations: api.Annotations{
Name: "name3",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
Endpoint: &api.Endpoint{
Ports: []*api.PortConfig{
{
PublishMode: api.PublishModeHost,
PublishedPort: 58,
Protocol: api.ProtocolUDP,
},
{
PublishMode: api.PublishModeHost,
PublishedPort: 58,
Protocol: api.ProtocolTCP,
},
},
},
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial node and task
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateTask(tx, task2))
assert.NoError(t, store.CreateNode(tx, node1))
assert.NoError(t, store.CreateNode(tx, node2))
return nil
})
assert.NoError(t, err)

scheduler := New(s)

watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()

go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()

// Tasks 1 and 2 should be assigned to different nodes.
assignment1 := watchAssignment(t, watch)
assignment2 := watchAssignment(t, watch)
assert.True(t, assignment1 != assignment2)

// Task 3 should not be schedulable.
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task3))
return nil
})
assert.NoError(t, err)

failure := watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (host-mode port already in use on 2 nodes)", failure.Status.Message)
}

0 comments on commit 4303643

Please sign in to comment.