Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
343 changes: 343 additions & 0 deletions agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,343 @@
package libnetwork

import (
"fmt"
"net"
"os"
"strings"

"github.com/Sirupsen/logrus"
"github.com/docker/go-events"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/networkdb"
)

type agent struct {
networkDB *networkdb.NetworkDB
bindAddr string
epTblCancel func()
driverCancelFuncs map[string][]func()
}

func getBindAddr(ifaceName string) (string, error) {
iface, err := net.InterfaceByName(ifaceName)
if err != nil {
return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
}

addrs, err := iface.Addrs()
if err != nil {
return "", fmt.Errorf("failed to get interface addresses: %v", err)
}

for _, a := range addrs {
addr, ok := a.(*net.IPNet)
if !ok {
continue
}
addrIP := addr.IP

if addrIP.IsLinkLocalUnicast() {
continue
}

return addrIP.String(), nil
}

return "", fmt.Errorf("failed to get bind address")
}

func resolveAddr(addrOrInterface string) (string, error) {
// Try and see if this is a valid IP address
if net.ParseIP(addrOrInterface) != nil {
return addrOrInterface, nil
}

// If not a valid IP address, it should be a valid interface
return getBindAddr(addrOrInterface)
}

func (c *controller) agentInit(bindAddrOrInterface string) error {
if !c.cfg.Daemon.IsAgent {
return nil
}

bindAddr, err := resolveAddr(bindAddrOrInterface)
if err != nil {
return err
}

hostname, _ := os.Hostname()
nDB, err := networkdb.New(&networkdb.Config{
BindAddr: bindAddr,
NodeName: hostname,
})

if err != nil {
return err
}

ch, cancel := nDB.Watch("endpoint_table", "", "")

c.agent = &agent{
networkDB: nDB,
bindAddr: bindAddr,
epTblCancel: cancel,
driverCancelFuncs: make(map[string][]func()),
}

go c.handleTableEvents(ch, c.handleEpTableEvent)
return nil
}

func (c *controller) agentJoin(remotes []string) error {
if c.agent == nil {
return nil
}

return c.agent.networkDB.Join(remotes)
}

func (c *controller) agentDriverNotify(d driverapi.Driver) {
if c.agent == nil {
return
}

d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
Address: c.agent.bindAddr,
Self: true,
})
}

func (c *controller) agentClose() {
if c.agent == nil {
return
}

for _, cancelFuncs := range c.agent.driverCancelFuncs {
for _, cancel := range cancelFuncs {
cancel()
}
}
c.agent.epTblCancel()

c.agent.networkDB.Close()
}

func (n *network) isClusterEligible() bool {
if n.driverScope() != datastore.GlobalScope {
return false
}

c := n.getController()
if c.agent == nil {
return false
}

return true
}

func (n *network) joinCluster() error {
if !n.isClusterEligible() {
return nil
}

c := n.getController()
return c.agent.networkDB.JoinNetwork(n.ID())
}

func (n *network) leaveCluster() error {
if !n.isClusterEligible() {
return nil
}

c := n.getController()
return c.agent.networkDB.LeaveNetwork(n.ID())
}

func (ep *endpoint) addToCluster() error {
n := ep.getNetwork()
if !n.isClusterEligible() {
return nil
}

c := n.getController()
if !ep.isAnonymous() && ep.Iface().Address() != nil {
if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), []byte(fmt.Sprintf("%s=%s", ep.Name(), ep.Iface().Address().IP))); err != nil {
return err
}
}

for _, te := range ep.joinInfo.driverTableEntries {
if err := c.agent.networkDB.CreateEntry(te.tableName, n.ID(), te.key, te.value); err != nil {
return err
}
}

return nil
}

func (ep *endpoint) deleteFromCluster() error {
n := ep.getNetwork()
if !n.isClusterEligible() {
return nil
}

c := n.getController()
if !ep.isAnonymous() {
if err := c.agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil {
return err
}
}

if ep.joinInfo == nil {
return nil
}

for _, te := range ep.joinInfo.driverTableEntries {
if err := c.agent.networkDB.DeleteEntry(te.tableName, n.ID(), te.key); err != nil {
return err
}
}

return nil
}

func (n *network) addDriverWatches() {
if !n.isClusterEligible() {
return
}

c := n.getController()
for _, tableName := range n.driverTables {
ch, cancel := c.agent.networkDB.Watch(tableName, n.ID(), "")
c.Lock()
c.agent.driverCancelFuncs[n.ID()] = append(c.agent.driverCancelFuncs[n.ID()], cancel)
c.Unlock()

go c.handleTableEvents(ch, n.handleDriverTableEvent)
d, err := n.driver(false)
if err != nil {
logrus.Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
return
}

c.agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool {
d.EventNotify(driverapi.Create, n.ID(), tableName, key, value)
return false
})
}
}

func (n *network) cancelDriverWatches() {
if !n.isClusterEligible() {
return
}

c := n.getController()
c.Lock()
cancelFuncs := c.agent.driverCancelFuncs[n.ID()]
delete(c.agent.driverCancelFuncs, n.ID())
c.Unlock()

for _, cancel := range cancelFuncs {
cancel()
}
}

func (c *controller) handleTableEvents(ch chan events.Event, fn func(events.Event)) {
for {
select {
case ev, ok := <-ch:
if !ok {
return
}

fn(ev)
}
}
}

func (n *network) handleDriverTableEvent(ev events.Event) {
d, err := n.driver(false)
if err != nil {
logrus.Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
return
}

var (
etype driverapi.EventType
tname string
key string
value []byte
)

switch event := ev.(type) {
case networkdb.CreateEvent:
tname = event.Table
key = event.Key
value = event.Value
etype = driverapi.Create
case networkdb.DeleteEvent:
tname = event.Table
key = event.Key
value = event.Value
etype = driverapi.Delete
case networkdb.UpdateEvent:
tname = event.Table
key = event.Key
value = event.Value
etype = driverapi.Delete
}

d.EventNotify(etype, n.ID(), tname, key, value)
}

func (c *controller) handleEpTableEvent(ev events.Event) {
var (
id string
value string
isAdd bool
)

switch event := ev.(type) {
case networkdb.CreateEvent:
id = event.NetworkID
value = string(event.Value)
isAdd = true
case networkdb.DeleteEvent:
id = event.NetworkID
value = string(event.Value)
case networkdb.UpdateEvent:
logrus.Errorf("Unexpected update service table event = %#v", event)
}

nw, err := c.NetworkByID(id)
if err != nil {
logrus.Errorf("Could not find network %s while handling service table event: %v", id, err)
return
}
n := nw.(*network)

pair := strings.Split(value, "=")
if len(pair) < 2 {
logrus.Errorf("Incorrect service table value = %s", value)
return
}

name := pair[0]
ip := net.ParseIP(pair[1])

if name == "" || ip == nil {
logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
return
}

if isAdd {
n.addSvcRecords(name, ip, nil, true)
} else {
n.deleteSvcRecords(name, ip, nil, true)
}
}
13 changes: 12 additions & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,17 @@ func procCreateNetwork(c libnetwork.NetworkController, vars map[string]string, b
if len(create.DriverOpts) > 0 {
options = append(options, libnetwork.NetworkOptionDriverOpts(create.DriverOpts))
}
nw, err := c.NewNetwork(create.NetworkType, create.Name, "", options...)

if len(create.IPv4Conf) > 0 {
ipamV4Conf := &libnetwork.IpamConf{
PreferredPool: create.IPv4Conf[0].PreferredPool,
SubPool: create.IPv4Conf[0].SubPool,
}

options = append(options, libnetwork.NetworkOptionIpam("default", "", []*libnetwork.IpamConf{ipamV4Conf}, nil, nil))
}

nw, err := c.NewNetwork(create.NetworkType, create.Name, create.ID, options...)
if err != nil {
return nil, convertNetworkError(err)
}
Expand Down Expand Up @@ -697,6 +707,7 @@ func procAttachBackend(c libnetwork.NetworkController, vars map[string]string, b
if err != nil {
return nil, convertNetworkError(err)
}

return sb.Key(), &successResponse
}

Expand Down
9 changes: 9 additions & 0 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,19 @@ type sandboxResource struct {
Body types
************/

type ipamConf struct {
PreferredPool string
SubPool string
Gateway string
AuxAddresses map[string]string
}

// networkCreate is the expected body of the "create network" http request message
type networkCreate struct {
Name string `json:"name"`
ID string `json:"id"`
NetworkType string `json:"network_type"`
IPv4Conf []ipamConf `json:"ipv4_configuration"`
DriverOpts map[string]string `json:"driver_opts"`
NetworkOpts map[string]string `json:"network_opts"`
}
Expand Down
Loading