Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
brb committed Oct 19, 2016
1 parent 71f8bb7 commit dc18c38
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 148 deletions.
164 changes: 86 additions & 78 deletions common/utils.go
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"net"
"os"
"regexp"
"strconv"
"strings"

"github.com/vishvananda/netlink"
Expand All @@ -27,56 +29,46 @@ func ErrorMessages(errors []error) string {
return strings.Join(result, "\n")
}

var netdevRegExp = regexp.MustCompile(`^([^ ]+?) ([^ ]+?) \[([^]]*)\]$`)

type NetDev struct {
Name string
MAC net.HardwareAddr
CIDRs []*net.IPNet
}

// Search the network namespace of a process for interfaces matching a predicate
// Note that the predicate is called while the goroutine is inside the process' netns
func FindNetDevs(processID int, match func(link netlink.Link) bool) ([]NetDev, error) {
var netDevs []NetDev
func (d NetDev) String() string {
return fmt.Sprintf("%s %s %s", d.Name, d.MAC, d.CIDRs)
}

ns, err := netns.GetFromPid(processID)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
func ParseNetDev(netdev string) (NetDev, error) {
match := netdevRegExp.FindStringSubmatch(netdev)
if match == nil {
return NetDev{}, fmt.Errorf("invalid netdev: %s", netdev)
}
defer ns.Close()

err = weavenet.WithNetNSUnsafe(ns, func() error {
return forEachLink(func(link netlink.Link) error {
if match(link) {
netDev, err := linkToNetDev(link)
if err != nil {
return err
}
netDevs = append(netDevs, netDev)
}
return nil
})
})

return netDevs, err
}

func forEachLink(f func(netlink.Link) error) error {
links, err := netlink.LinkList()
iface := match[1]
mac, err := net.ParseMAC(match[2])
if err != nil {
return err
return NetDev{}, fmt.Errorf("cannot parse mac %s: %s", match[2], err)
}
for _, link := range links {
if err := f(link); err != nil {
return err

var cidrs []*net.IPNet
for _, cidr := range strings.Split(match[3], " ") {
if cidr != "" {
ip, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
return NetDev{}, fmt.Errorf("cannot parse cidr %s: %s", cidr, err)
}
ipnet.IP = ip
cidrs = append(cidrs, ipnet)
}
}
return nil

return NetDev{Name: iface, MAC: mac, CIDRs: cidrs}, nil
}

func linkToNetDev(link netlink.Link) (NetDev, error) {
func LinkToNetDev(link netlink.Link) (NetDev, error) {
addrs, err := netlink.AddrList(link, netlink.FAMILY_V4)
if err != nil {
return NetDev{}, err
Expand All @@ -89,71 +81,87 @@ func linkToNetDev(link netlink.Link) (NetDev, error) {
return netDev, nil
}

// ConnectedToBridgePredicate returns a function which is used to query whether
// a given link is a veth interface which one end is connected to a bridge.
// The returned function should be called from a container network namespace which
// the bridge does NOT belong to.
func ConnectedToBridgePredicate(bridgeName string) (func(link netlink.Link) bool, error) {
indexes := make(map[int]struct{})

// Scan devices in root namespace to find those attached to weave bridge
err := weavenet.WithNetNSLinkByPidUnsafe(1, bridgeName,
func(br netlink.Link) error {
return forEachLink(func(link netlink.Link) error {
if link.Attrs().MasterIndex == br.Attrs().Index {
peerIndex := link.Attrs().ParentIndex
if peerIndex == 0 {
// perhaps running on an older kernel where ParentIndex doesn't work.
// as fall-back, assume the indexes are consecutive
peerIndex = link.Attrs().Index - 1
}
indexes[peerIndex] = struct{}{}
}
return nil
})
})
// ConnectedToBridgeVethPeerIds returns peer indexes of veth links connected to
// the given bridge. The peer index is used to query from a container netns
// whether the container is connected to the bridge.
func ConnectedToBridgeVethPeerIds(bridgeName string) ([]int, error) {
var ids []int

br, err := netlink.LinkByName(bridgeName)
if err != nil {
return nil, err
}
links, err := netlink.LinkList()
if err != nil {
return nil, err
}

for _, link := range links {
if _, isveth := link.(*netlink.Veth); isveth && link.Attrs().MasterIndex == br.Attrs().Index {
peerID := link.Attrs().ParentIndex
if peerID == 0 {
// perhaps running on an older kernel where ParentIndex doesn't work.
// as fall-back, assume the peers are consecutive
peerID = link.Attrs().Index - 1
}
ids = append(ids, peerID)
}
}

return ids, nil
}

// Lookup the weave interface of a container
func GetWeaveNetDevs(processID int) ([]NetDev, error) {
peerIDs, err := ConnectedToBridgeVethPeerIds("weave")
if err != nil {
return nil, err
}

return func(link netlink.Link) bool {
_, isveth := link.(*netlink.Veth)
_, found := indexes[link.Attrs().Index]
return isveth && found
}, nil
return GetNetDevsByVethPeerIds(processID, peerIDs)
}

func GetNetDevsWithPredicate(processID int, predicate func(link netlink.Link) bool) ([]NetDev, error) {
// Bail out if this process is running in the root namespace
nsToplevel, err := netns.GetFromPid(1)
func GetNetDevsByVethPeerIds(processID int, peerIDs []int) ([]NetDev, error) {
// Bail out if this container is running in the root namespace
netnsRoot, err := netns.GetFromPid(1)
if err != nil {
return nil, fmt.Errorf("unable to open root namespace: %s", err)
}
defer nsToplevel.Close()
nsContainr, err := netns.GetFromPid(processID)
defer netnsRoot.Close()
netnsContainer, err := netns.GetFromPid(processID)
if err != nil {
// Unable to find a namespace for this process - just return nothing
if os.IsNotExist(err) {
return nil, nil
}
return nil, fmt.Errorf("unable to open process %d namespace: %s", processID, err)
}
defer nsContainr.Close()
if nsToplevel.Equal(nsContainr) {
defer netnsContainer.Close()
if netnsRoot.Equal(netnsContainer) {
return nil, nil
}

return FindNetDevs(processID, predicate)
}
var netdevs []NetDev

// Lookup the weave interface of a container
func GetWeaveNetDevs(processID int) ([]NetDev, error) {
p, err := ConnectedToBridgePredicate("weave")
peersStr := make([]string, len(peerIDs))
for i, id := range peerIDs {
peersStr[i] = strconv.Itoa(id)
}
netdevsStr, err := weavenet.WithNetNSByPid(processID, "list-netdevs", strings.Join(peersStr, ","))
if err != nil {
return nil, err
return nil, fmt.Errorf("list-netdevs failed: %s", err)
}
for _, netdevStr := range strings.Split(netdevsStr, "\n") {
if netdevStr != "" {
netdev, err := ParseNetDev(netdevStr)
if err != nil {
return nil, fmt.Errorf("cannot parse netdev %s: %s", netdevStr, err)
}
netdevs = append(netdevs, netdev)
}
}

return GetNetDevsWithPredicate(processID, p)
return netdevs, nil
}

// Get the weave bridge interface.
Expand All @@ -164,5 +172,5 @@ func GetBridgeNetDev(bridgeName string) (NetDev, error) {
return NetDev{}, err
}

return linkToNetDev(link)
return LinkToNetDev(link)
}
12 changes: 2 additions & 10 deletions net/netns.go
Expand Up @@ -57,18 +57,10 @@ func WithNetNSLinkUnsafe(ns netns.NsHandle, ifName string, work func(link netlin
})
}

func WithNetNSLinkByPidUnsafe(pid int, ifName string, work func(link netlink.Link) error) error {
ns, err := netns.GetFromPid(pid)
if err != nil {
return err
}
defer ns.Close()

return WithNetNSLinkUnsafe(ns, ifName, work)
}

// A safe version of WithNetNS* which creates a process executing
// "nsenter --net=<ns-path> weaveutil <cmd> [args]".
//
// TODO(mp) Fix (indirect) circular dependency (weaveutil -> net -> weaveutil)
func WithNetNS(nsPath string, cmd string, args ...string) (string, error) {
var stdout, stderr bytes.Buffer

Expand Down
42 changes: 22 additions & 20 deletions net/veth.go
Expand Up @@ -105,21 +105,26 @@ const (
vethPrefix = "v" + VethName // starts with "veth" to suppress UI notifications
)

func interfaceExistsInNamespace(ns netns.NsHandle, ifName string) bool {
err := WithNetNSUnsafe(ns, func() error {
_, err := netlink.LinkByName(ifName)
return err
})
func interfaceExistsInNamespace(netNSPath string, ifName string) bool {
_, err := WithNetNS(netNSPath, "check-iface", ifName)
return err == nil
}

func AttachContainer(ns netns.NsHandle, id, ifName, bridgeName string, mtu int, withMulticastRoute bool, cidrs []*net.IPNet, keepTXOn bool) error {
// NB: This function can be used only by a process that terminates immediately
// after calling the function as it changes netns via WithNetNSLinkUnsafe.
func AttachContainer(netNSPath, id, ifName, bridgeName string, mtu int, withMulticastRoute bool, cidrs []*net.IPNet, keepTXOn bool) error {
ns, err := netns.GetFromPath(netNSPath)
if err != nil {
return err
}
defer ns.Close()

ipt, err := iptables.New()
if err != nil {
return err
}

if !interfaceExistsInNamespace(ns, ifName) {
if !interfaceExistsInNamespace(netNSPath, ifName) {
maxIDLen := IFNAMSIZ - 1 - len(vethPrefix+"pl")
if len(id) > maxIDLen {
id = id[:maxIDLen] // trim passed ID if too long
Expand All @@ -129,18 +134,7 @@ func AttachContainer(ns netns.NsHandle, id, ifName, bridgeName string, mtu int,
if err := netlink.LinkSetNsFd(veth, int(ns)); err != nil {
return fmt.Errorf("failed to move veth to container netns: %s", err)
}
if err := WithNetNSUnsafe(ns, func() error {
if err := netlink.LinkSetName(veth, ifName); err != nil {
return err
}
if err := ConfigureARPCache(ifName); err != nil {
return err
}
if err := ipt.Append("filter", "INPUT", "-i", ifName, "-d", "224.0.0.0/4", "-j", "DROP"); err != nil {
return err
}
return nil
}); err != nil {
if _, err := WithNetNS(netNSPath, "setup-iface", peerName, ifName); err != nil {
return fmt.Errorf("error setting up interface: %s", err)
}
return nil
Expand Down Expand Up @@ -201,7 +195,15 @@ func AttachContainer(ns netns.NsHandle, id, ifName, bridgeName string, mtu int,
return nil
}

func DetachContainer(ns netns.NsHandle, id, ifName string, cidrs []*net.IPNet) error {
// NB: This function can be used only by a process that terminates immediately
// after calling the function as it changes netns via WithNetNSLinkUnsafe.
func DetachContainer(netNSPath, id, ifName string, cidrs []*net.IPNet) error {
ns, err := netns.GetFromPath(netNSPath)
if err != nil {
return err
}
defer ns.Close()

ipt, err := iptables.New()
if err != nil {
return err
Expand Down
16 changes: 2 additions & 14 deletions plugin/net/cni.go
Expand Up @@ -107,7 +107,7 @@ func (c *CNIPlugin) CmdAdd(args *skel.CmdArgs) error {
id = fmt.Sprintf("%x", data)
}

if err := weavenet.AttachContainer(ns, id, args.IfName, conf.BrName, conf.MTU, false, []*net.IPNet{&result.IP4.IP}, false); err != nil {
if err := weavenet.AttachContainer(args.Netns, id, args.IfName, conf.BrName, conf.MTU, false, []*net.IPNet{&result.IP4.IP}, false); err != nil {
return err
}
if err := weavenet.WithNetNSLinkUnsafe(ns, args.IfName, func(link netlink.Link) error {
Expand Down Expand Up @@ -181,19 +181,7 @@ func (c *CNIPlugin) CmdDel(args *skel.CmdArgs) error {
return err
}

ns, err := netns.GetFromPath(args.Netns)
if err != nil {
return err
}
defer ns.Close()
err = weavenet.WithNetNSUnsafe(ns, func() error {
link, err := netlink.LinkByName(args.IfName)
if err != nil {
return err
}
return netlink.LinkDel(link)
})
if err != nil {
if _, err = weavenet.WithNetNS(args.Netns, "del-iface", args.IfName); err != nil {
return fmt.Errorf("error removing interface: %s", err)
}

Expand Down
12 changes: 4 additions & 8 deletions prog/weaveutil/addrs.go
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"

"github.com/fsouza/go-dockerclient"
"github.com/vishvananda/netlink"

"github.com/weaveworks/weave/common"
weavenet "github.com/weaveworks/weave/net"
Expand All @@ -21,7 +20,7 @@ func containerAddrs(args []string) error {
return err
}

pred, err := common.ConnectedToBridgePredicate(bridgeName)
peerIDs, err := common.ConnectedToBridgeVethPeerIds(bridgeName)
if err != nil {
if err == weavenet.ErrLinkNotFound {
return nil
Expand Down Expand Up @@ -51,11 +50,8 @@ func containerAddrs(args []string) error {
containerIDs = append(containerIDs, cid)
}

// NB: Because network namespaces (netns) are changed many times inside the loop,
// it's NOT safe to exec any code depending on the root netns without
// wrapping with WithNetNS*.
for _, cid := range containerIDs {
netDevs, err := getNetDevs(client, containers[cid], pred)
netDevs, err := getNetDevs(client, containers[cid], peerIDs)
if err != nil {
return err
}
Expand All @@ -65,11 +61,11 @@ func containerAddrs(args []string) error {
return nil
}

func getNetDevs(c *docker.Client, container *docker.Container, pred func(netlink.Link) bool) ([]common.NetDev, error) {
func getNetDevs(c *docker.Client, container *docker.Container, peerIDs []int) ([]common.NetDev, error) {
if container.State.Pid == 0 {
return nil, nil
}
return common.GetNetDevsWithPredicate(container.State.Pid, pred)
return common.GetNetDevsByVethPeerIds(container.State.Pid, peerIDs)
}

func printNetDevs(cid string, netDevs []common.NetDev) {
Expand Down

0 comments on commit dc18c38

Please sign in to comment.