This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

network: Add support for address sets

As spark scaled, the number of ACLs that OVN had to process grew at
O(n^2) as the number of workers grew.  At a scale of 64 nodes this
would cause OVN to spend several minutes processing ACLs before the
network would work properly.

This patch dramatically reduces the number of ACLs by adding support
for address sets to the Kelda Daemon.  Address sets allow ACLs to be
express groups of IP addresses, instead of requiring them to be
exploded out into a separate ACL per IP.

This patch adds full support for address sets to the deployment
engine, and partial support to the Javascript bindings.  Even with
partial support for address sets, ACLs are no longer a bottleneck when
running Spark.  However, full support will need to be added later if
we want to scale to large numbers of containers.
  • Loading branch information...
ejj committed Nov 2, 2017
1 parent 7435cd0 commit 8202c5f9617ea3270662fba341ea86db023d73c2
View
@@ -83,13 +83,13 @@ type LoadBalancer struct {
Hostnames []string `json:",omitempty"`
}
// A Connection allows the container with the `From` hostname to speak to the container
// with the `To` hostname in ports in the range [MinPort, MaxPort]
// A Connection allows any container whose hostname appears in `From` to speak with any
// container whose hostname appears in `To` using ports in the range [MinPort, MaxPort]
type Connection struct {
From string `json:",omitempty"`
To string `json:",omitempty"`
MinPort int `json:",omitempty"`
MaxPort int `json:",omitempty"`
From []string `json:",omitempty"`
To []string `json:",omitempty"`
MinPort int `json:",omitempty"`
MaxPort int `json:",omitempty"`
}
// A ConnectionSlice allows for slices of Collections to be used in joins
@@ -2,6 +2,7 @@ package inspect
import (
"fmt"
"github.com/kelda/kelda/blueprint"
)
@@ -59,18 +60,22 @@ func (g Graph) GetConnections() []Edge {
return res
}
func (g *Graph) addConnection(from string, to string) error {
fromNode, ok := g.Nodes[from]
if !ok {
return fmt.Errorf("no node: %s", from)
}
func (g *Graph) addConnection(fromSlice, toSlice []string) error {
for _, from := range fromSlice {
for _, to := range toSlice {
fromNode, ok := g.Nodes[from]
if !ok {
return fmt.Errorf("no node: %s", from)
}
toNode, ok := g.Nodes[to]
if !ok {
return fmt.Errorf("no node: %s", to)
}
toNode, ok := g.Nodes[to]
if !ok {
return fmt.Errorf("no node: %s", to)
}
fromNode.Connections[to] = toNode
fromNode.Connections[to] = toNode
}
}
return nil
}
@@ -63,8 +63,10 @@ func TestViz(t *testing.T) {
},
},
Connections: []blueprint.Connection{
{From: "a", To: "b", MinPort: 22, MaxPort: 22},
{From: "b", To: "c", MinPort: 22, MaxPort: 22},
{From: []string{"a"}, To: []string{"b"},
MinPort: 22, MaxPort: 22},
{From: []string{"b"}, To: []string{"c"},
MinPort: 22, MaxPort: 22},
},
}
View
@@ -14,6 +14,7 @@ import (
"github.com/kelda/kelda/blueprint"
"github.com/kelda/kelda/db"
"github.com/kelda/kelda/util"
"github.com/kelda/kelda/util/str"
)
// An arbitrary length to truncate container commands to.
@@ -217,15 +218,18 @@ func writeContainers(fd io.Writer, containers []db.Container, machines []db.Mach
func connToPorts(connections []db.Connection) map[string][]string {
hostnamePublicPorts := map[string][]string{}
for _, c := range connections {
if c.From != blueprint.PublicInternetLabel {
if !str.SliceContains(c.From, blueprint.PublicInternetLabel) {
continue
}
portStr := fmt.Sprintf("%d", c.MinPort)
if c.MinPort != c.MaxPort {
portStr += fmt.Sprintf("-%d", c.MaxPort)
for _, to := range c.To {
portStr := fmt.Sprintf("%d", c.MinPort)
if c.MinPort != c.MaxPort {
portStr += fmt.Sprintf("-%d", c.MaxPort)
}
hostnamePublicPorts[to] = append(hostnamePublicPorts[to],
portStr)
}
hostnamePublicPorts[c.To] = append(hostnamePublicPorts[c.To], portStr)
}
return hostnamePublicPorts
}
View
@@ -171,10 +171,14 @@ func TestContainerOutput(t *testing.T) {
{CloudID: "7", PrivateIP: ""},
}
connections := []db.Connection{
{ID: 1, From: "public", To: "frompublic1", MinPort: 80, MaxPort: 80},
{ID: 1, From: "public", To: "frompublic2", MinPort: 80, MaxPort: 80},
{ID: 1, From: "public", To: "frompublic3", MinPort: 80, MaxPort: 80},
{ID: 2, From: "notpublic", To: "frompublic1", MinPort: 100, MaxPort: 101},
{ID: 1, From: []string{"public"}, To: []string{"frompublic1"},
MinPort: 80, MaxPort: 80},
{ID: 1, From: []string{"public", "notpublic"},
To: []string{"frompublic2"}, MinPort: 80, MaxPort: 80},
{ID: 1, From: []string{"public"}, To: []string{"frompublic3"},
MinPort: 80, MaxPort: 80},
{ID: 2, From: []string{"notpublic", "frompublic2"},
To: []string{"frompublic1"}, MinPort: 100, MaxPort: 101},
}
expected := `CONTAINER____MACHINE____COMMAND___________HOSTNAME_______` +
@@ -273,8 +277,10 @@ ________________________________________________________________________________
{CloudID: "5", PublicIP: "7.7.7.7", PrivateIP: "1.1.1.1"},
}
connections = []db.Connection{
{ID: 1, From: "public", To: "frompub", MinPort: 80, MaxPort: 80},
{ID: 2, From: "public", To: "frompub", MinPort: 100, MaxPort: 101},
{ID: 1, From: []string{"public"}, To: []string{"frompub"},
MinPort: 80, MaxPort: 80},
{ID: 2, From: []string{"public"}, To: []string{"frompub"},
MinPort: 100, MaxPort: 101},
}
expected = `CONTAINER____MACHINE____COMMAND____HOSTNAME____STATUS_______` +
View
@@ -9,6 +9,7 @@ import (
"github.com/kelda/kelda/cloud/foreman"
"github.com/kelda/kelda/db"
"github.com/kelda/kelda/join"
"github.com/kelda/kelda/util/str"
log "github.com/sirupsen/logrus"
)
@@ -254,7 +255,7 @@ func (cld cloud) desiredACLs(bp db.Blueprint) map[acl.ACL]struct{} {
}
for _, conn := range bp.Connections {
if conn.From == blueprint.PublicInternetLabel {
if str.SliceContains(conn.From, blueprint.PublicInternetLabel) {
acl := acl.ACL{
CidrIP: "0.0.0.0/0",
MinPort: conn.MinPort,
View
@@ -271,8 +271,8 @@ func TestDesiredACLs(t *testing.T) {
acls = cld.desiredACLs(db.Blueprint{
Blueprint: blueprint.Blueprint{
Connections: []blueprint.Connection{{
From: "foo",
To: "bar",
From: []string{"foo"},
To: []string{"bar"},
MinPort: 5,
MaxPort: 6,
}},
@@ -284,8 +284,8 @@ func TestDesiredACLs(t *testing.T) {
acls = cld.desiredACLs(db.Blueprint{
Blueprint: blueprint.Blueprint{
Connections: []blueprint.Connection{{
From: blueprint.PublicInternetLabel,
To: "bar",
From: []string{blueprint.PublicInternetLabel},
To: []string{"bar"},
MinPort: 1,
MaxPort: 2,
}},
View
@@ -2,15 +2,17 @@ package db
import (
"fmt"
"github.com/kelda/kelda/util/str"
)
// A Connection allows two hostnames to speak to each other on the port
// range [MinPort, MaxPort] inclusive.
type Connection struct {
ID int `json:"-"`
From string
To string
From []string
To []string
MinPort int
MaxPort int
}
@@ -61,11 +63,13 @@ func (c Connection) String() string {
func (c Connection) less(r row) bool {
o := r.(Connection)
if cmp := str.SliceCmp(c.From, o.From); cmp != 0 {
return cmp < 0
} else if cmp := str.SliceCmp(c.To, o.To); cmp != 0 {
return cmp < 0
}
switch {
case c.From != o.From:
return c.From < o.From
case c.To != o.To:
return c.To < o.To
case c.MaxPort != o.MaxPort:
return c.MaxPort < o.MaxPort
case c.MinPort != o.MinPort:
View
@@ -15,7 +15,7 @@ func TestConnection(t *testing.T) {
conn.Txn(ConnectionTable).Run(func(view Database) error {
connection := view.InsertConnection()
id = connection.ID
connection.From = "foo"
connection.From = []string{"foo"}
view.Commit(connection)
return nil
})
@@ -25,18 +25,24 @@ func TestConnection(t *testing.T) {
assert.Equal(t, 1, connections.Len())
connection := connections[0]
assert.Equal(t, "foo", connection.From)
assert.Equal(t, []string{"foo"}, connection.From)
assert.Equal(t, id, connection.getID())
connection.MaxPort = 3
assert.Equal(t, "Connection-1{foo->:0-3}", connection.String())
assert.Equal(t, "Connection-1{[foo]->[]:0-3}", connection.String())
connection.MaxPort = 0
assert.Equal(t, connection, connections.Get(0))
assert.True(t, connection.less(Connection{From: "z"}))
assert.True(t, connection.less(Connection{From: "foo", To: "a"}))
assert.True(t, connection.less(Connection{From: "foo", MaxPort: 1}))
assert.True(t, connection.less(Connection{From: "foo", MinPort: 100}))
assert.True(t, connection.less(Connection{From: "foo", ID: id + 1}))
assert.True(t, connection.less(Connection{From: []string{"z"}}))
assert.True(t, connection.less(Connection{From: []string{"foo"},
To: []string{"a"}}))
assert.True(t, connection.less(Connection{From: []string{"foo"}, MaxPort: 1}))
assert.True(t, connection.less(Connection{From: []string{"foo"}, MinPort: 100}))
assert.True(t, connection.less(Connection{From: []string{"foo"}, ID: id + 1}))
assert.True(t, connection.less(Connection{From: []string{"foo", "bar"}}))
assert.True(t, connection.less(Connection{From: []string{"foo"},
To: []string{"baz", "qux"}}))
}
@@ -41,7 +41,9 @@ func newConnectionTester(clnt client.Client) (connectionTester, error) {
connectionMap := make(map[string][]string)
for _, conn := range connections {
connectionMap[conn.From] = append(connectionMap[conn.From], conn.To)
for _, from := range conn.From {
connectionMap[from] = append(connectionMap[from], conn.To...)
}
}
return connectionTester{
@@ -8,6 +8,7 @@ import (
"github.com/kelda/kelda/blueprint"
"github.com/kelda/kelda/db"
"github.com/kelda/kelda/integration-tester/util"
"github.com/kelda/kelda/util/str"
)
func TestOutboundPublic(t *testing.T) {
@@ -36,9 +37,11 @@ var testHost = fmt.Sprintf("google.com:%d", testPort)
func test(t *testing.T, containers []db.Container, connections []db.Connection) {
connected := map[string]struct{}{}
for _, conn := range connections {
if conn.To == blueprint.PublicInternetLabel &&
if str.SliceContains(conn.To, blueprint.PublicInternetLabel) &&
inRange(testPort, conn.MinPort, conn.MaxPort) {
connected[conn.From] = struct{}{}
for _, from := range conn.From {
connected[from] = struct{}{}
}
}
}
@@ -13,6 +13,7 @@ import (
cliPath "github.com/kelda/kelda/cli/path"
tlsIO "github.com/kelda/kelda/connection/tls/io"
"github.com/kelda/kelda/db"
"github.com/kelda/kelda/util/str"
)
// GetDefaultDaemonClient gets an API client connected to the daemon on the
@@ -34,9 +35,11 @@ func CheckPublicConnections(t *testing.T, machines []db.Machine,
// Map of hostname to its publicly exposed ports.
pubConns := map[string][]int{}
for _, conn := range connections {
if conn.From == "public" {
if str.SliceContains(conn.From, "public") {
for port := conn.MinPort; port <= conn.MaxPort; port++ {
pubConns[conn.To] = append(pubConns[conn.To], port)
for _, to := range conn.To {
pubConns[to] = append(pubConns[to], port)
}
}
}
}
View
@@ -306,7 +306,7 @@ function vet(infrastructure) {
});
infrastructure.connections.forEach((conn) => {
[conn.from, conn.to].forEach((host) => {
conn.from.concat(conn.to).forEach((host) => {
if (!hostnameMap[host]) {
throw new Error(`connection ${stringify(conn)} references ` +
`an undefined hostname: ${host}`);
@@ -396,10 +396,8 @@ class LoadBalancer {
'or list of containers and not from a Load Balancer or other object.');
}
src.forEach((c) => {
this.allowedInboundConnections.push(
new Connection(c, boxRange(portRange)));
});
this.allowedInboundConnections.push(
new Connection(src, boxRange(portRange)));
}
/**
@@ -409,8 +407,8 @@ class LoadBalancer {
*/
getKeldaConnections() {
return this.allowedInboundConnections.map(conn => ({
from: conn.from.hostname,
to: this.name,
from: conn.from.map(f => f.hostname),
to: [this.name],
minPort: conn.minPort,
maxPort: conn.maxPort,
}));
@@ -1228,10 +1226,8 @@ class Container {
'list of containers, and not from a LoadBalancer or other object.');
}
src.forEach((c) => {
this.allowedInboundConnections.push(
new Connection(c, boxRange(portRange)));
});
this.allowedInboundConnections.push(
new Connection(src, boxRange(portRange)));
}
/**
@@ -1290,26 +1286,26 @@ class Container {
this.allowedInboundConnections.forEach((conn) => {
connections.push({
from: conn.from.hostname,
to: this.hostname,
from: conn.from.map(f => f.hostname),
to: [this.hostname],
minPort: conn.minPort,
maxPort: conn.maxPort,
});
});
this.outgoingPublic.forEach((rng) => {
connections.push({
from: this.hostname,
to: publicInternetLabel,
from: [this.hostname],
to: [publicInternetLabel],
minPort: rng.min,
maxPort: rng.max,
});
});
this.incomingPublic.forEach((rng) => {
connections.push({
from: publicInternetLabel,
to: this.hostname,
from: [publicInternetLabel],
to: [this.hostname],
minPort: rng.min,
maxPort: rng.max,
});
@@ -1458,7 +1454,7 @@ class Connection {
* Creates a Connection.
* @constructor
*
* @param {string} from - The host from which connections are allowed.
* @param {string[]} from - A list of hosts that allow connections.
* @param {PortRange} ports - The port numbers which are allowed.
*/
constructor(from, ports) {
Oops, something went wrong.

0 comments on commit 8202c5f

Please sign in to comment.