From 2a501e2cf065de280bbe57affe2fdd02fa4cedc0 Mon Sep 17 00:00:00 2001 From: forrestchen Date: Tue, 22 Oct 2019 13:31:31 +0800 Subject: [PATCH] refactor Signed-off-by: forrestchen --- .gitignore | 1 + cni/k8s-sriov/k8s_sriov.go | 125 ++++++++-------- cni/k8s-vlan/k8s_vlan.go | 96 +++++++----- go.mod | 2 +- go.sum | 4 +- .../schedulerplugin/floatingip_plugin_test.go | 4 +- pkg/ipam/schedulerplugin/resync.go | 2 +- pkg/ipam/schedulerplugin/types.go | 4 +- pkg/ipam/server/server.go | 6 +- pkg/network/vlan/vlan.go | 104 +++++++------ pkg/policy/policy.go | 137 ++++++++++-------- 11 files changed, 268 insertions(+), 217 deletions(-) diff --git a/.gitignore b/.gitignore index 8370c756..c9472115 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ /.idea *.iml /.vscode +/vendor /go /tools/route_monitor/route_monitor /vendor/github.com/docker/distribution/contrib diff --git a/cni/k8s-sriov/k8s_sriov.go b/cni/k8s-sriov/k8s_sriov.go index 827a2f06..89de34d6 100644 --- a/cni/k8s-sriov/k8s_sriov.go +++ b/cni/k8s-sriov/k8s_sriov.go @@ -17,8 +17,8 @@ import ( t020 "github.com/containernetworking/cni/pkg/types/020" "github.com/containernetworking/cni/pkg/version" "github.com/containernetworking/plugins/pkg/ns" - glog "k8s.io/klog" "github.com/vishvananda/netlink" + glog "k8s.io/klog" ) type NetConf struct { @@ -107,112 +107,117 @@ func main() { func setupVF(conf *NetConf, result *t020.Result, podifName string, vlan int, netns ns.NetNS) error { cpus := runtime.NumCPU() ifName := conf.Device - var vfIdx int - var infos []os.FileInfo m, err := netlink.LinkByName(ifName) if err != nil { return fmt.Errorf("failed to lookup master %q: %v", conf.Device, err) } - // get the ifname sriov vf num - vfTotal, err := getSriovNumVfs(ifName, kindTotalVfs) + minVfNum, err := getSriovVfNum(conf) if err != nil { return err } - - if vfTotal <= 0 { - return fmt.Errorf("no virtual function in the device %q", ifName) - } - - vfNums, err := getSriovNumVfs(ifName, kindNumVfs) + infos, vfIdx, vfDev, vfName, err := findAvailableVf(minVfNum, conf) if err != nil { return err } - - minVfNum := min(vfTotal, conf.VFNum) - // only set vf when `sriov_numvfs` is 0 - if vfNums == 0 { - if err := setSriovNumVfs(ifName, minVfNum); err != nil { - return err + if err = netlink.LinkSetVfVlan(m, vfIdx, vlan); err != nil { + return fmt.Errorf("failed to set vf %d vlan: %v", vfIdx, err) + } + if err = netlink.LinkSetUp(vfDev); err != nil { + return fmt.Errorf("failed to setup vf %d device: %v", vfIdx, err) + } + // move VF device to ns + if err = netlink.LinkSetNsFd(vfDev, int(netns.Fd())); err != nil { + return fmt.Errorf("failed to move vf %d to netns: %v", vfIdx, err) + } + if err = netns.Do(func(_ ns.NetNS) error { + err := renameLink(vfName, podifName) + if err != nil { + return fmt.Errorf("failed to rename %d vf of the device %q to %q: %v", vfIdx, vfName, ifName, err) } - } else if vfNums < minVfNum { - glog.Warning("sriov_numvfs is set but small") + return cniutil.ConfigureIface(podifName, result) + }); err != nil { + return err } + hiDir := fmt.Sprintf("/sys/class/net/%s/device/virtfn%d/msi_irqs", ifName, vfIdx) + if infos, err = ioutil.ReadDir(hiDir); err == nil { + for i := range infos { + hiNum, err := strconv.Atoi(infos[i].Name()) + if err == nil { + selectedCPU := fmt.Sprintf("%d", hiNum%cpus) + irqFile := fmt.Sprintf("/proc/irq/%d/smp_affinity_list", hiNum) + if err = ioutil.WriteFile(irqFile, []byte(selectedCPU), 0644); err != nil { + return fmt.Errorf("failed set irq smp affinity: %v", err) + } + } + } + } + return nil +} +func findAvailableVf(minVfNum int, conf *NetConf) ([]os.FileInfo, int, netlink.Link, string, error) { + ifName := conf.Device + var infos []os.FileInfo + var err error + var vfIdx int for vf := 0; vf <= (minVfNum - 1); vf++ { vfDir := fmt.Sprintf("/sys/class/net/%s/device/virtfn%d/net", ifName, vf) if _, err := os.Lstat(vfDir); err != nil { if vf == (minVfNum - 1) { - return fmt.Errorf("failed to open the virtfn%d dir of the device %q: %v", vf, ifName, err) + return nil, 0, nil, "", fmt.Errorf("failed to open the virtfn%d dir of the device %q: %v", vf, ifName, err) } continue } - infos, err = ioutil.ReadDir(vfDir) if err != nil { - return fmt.Errorf("failed to read the virtfn%d dir of the device %q: %v", vf, ifName, err) + return nil, 0, nil, "", fmt.Errorf("failed to read the virtfn%d dir of the device %q: %v", vf, ifName, err) } - if (len(infos) == 0) && (vf == (minVfNum - 1)) { - return fmt.Errorf("no Virtual function exist in directory %s, last vf is virtfn%d", vfDir, vf) + return nil, 0, nil, "", fmt.Errorf("no Virtual function exist in directory %s, last vf is virtfn%d", vfDir, vf) } - if (len(infos) == 0) && (vf != (minVfNum - 1)) { continue } vfIdx = vf break - } // VF NIC name if len(infos) != 1 { - return fmt.Errorf("no virtual network resources available for the %q", conf.Device) + return nil, 0, nil, "", fmt.Errorf("no virtual network resources available for the %q", ifName) } vfName := infos[0].Name() - vfDev, err := netlink.LinkByName(vfName) if err != nil { - return fmt.Errorf("failed to lookup vf device %q: %v", vfName, err) - } - - if err = netlink.LinkSetVfVlan(m, vfIdx, vlan); err != nil { - return fmt.Errorf("failed to set vf %d vlan: %v", vfIdx, err) + return nil, 0, nil, "", fmt.Errorf("failed to lookup vf device %q: %v", vfName, err) } + return infos, vfIdx, vfDev, vfName, nil +} - if err = netlink.LinkSetUp(vfDev); err != nil { - return fmt.Errorf("failed to setup vf %d device: %v", vfIdx, err) +func getSriovVfNum(conf *NetConf) (int, error) { + ifName := conf.Device + vfTotal, err := getSriovNumVfs(ifName, kindTotalVfs) + if err != nil { + return 0, err } - - // move VF device to ns - if err = netlink.LinkSetNsFd(vfDev, int(netns.Fd())); err != nil { - return fmt.Errorf("failed to move vf %d to netns: %v", vfIdx, err) + if vfTotal <= 0 { + return 0, fmt.Errorf("no virtual function in the device %q", ifName) } - - if err = netns.Do(func(_ ns.NetNS) error { - err := renameLink(vfName, podifName) - if err != nil { - return fmt.Errorf("failed to rename %d vf of the device %q to %q: %v", vfIdx, vfName, ifName, err) - } - return cniutil.ConfigureIface(podifName, result) - }); err != nil { - return err + vfNums, err := getSriovNumVfs(ifName, kindNumVfs) + if err != nil { + return 0, err } - hiDir := fmt.Sprintf("/sys/class/net/%s/device/virtfn%d/msi_irqs", ifName, vfIdx) - if infos, err = ioutil.ReadDir(hiDir); err == nil { - for i := range infos { - hiNum, err := strconv.Atoi(infos[i].Name()) - if err == nil { - selectedCPU := fmt.Sprintf("%d", hiNum%cpus) - irqFile := fmt.Sprintf("/proc/irq/%d/smp_affinity_list", hiNum) - if err = ioutil.WriteFile(irqFile, []byte(selectedCPU), 0644); err != nil { - return fmt.Errorf("failed set irq smp affinity: %v", err) - } - } + minVfNum := min(vfTotal, conf.VFNum) + // only set vf when `sriov_numvfs` is 0 + if vfNums == 0 { + if err := setSriovNumVfs(ifName, minVfNum); err != nil { + return 0, err } + } else if vfNums < minVfNum { + glog.Warning("sriov_numvfs is set but small") } - return nil + return minVfNum, nil } func releaseVF(podifName string, netns ns.NetNS) error { diff --git a/cni/k8s-vlan/k8s_vlan.go b/cni/k8s-vlan/k8s_vlan.go index 00d3535b..74ada91d 100644 --- a/cni/k8s-vlan/k8s_vlan.go +++ b/cni/k8s-vlan/k8s_vlan.go @@ -50,24 +50,9 @@ func cmdAdd(args *skel.CmdArgs) error { if err := d.Init(); err != nil { return fmt.Errorf("failed to setup bridge %v", err) } - var result020s []*t020.Result - for i := 0; i < len(results); i++ { - result020, err := t020.GetResult(results[i]) - if err != nil { - return err - } - result020s = append(result020s, result020) - } - if len(result020s) == 2 { - routes := result020s[0].IP4.Routes - for i := 0; i < len(routes); i++ { - if routes[i].Dst.String() == "0.0.0.0/0" { - routes = append(routes[:i], routes[i+1:]...) - break - } - } - routes = append(routes, types.Route{Dst: *pANet}, types.Route{Dst: *pBNet}, types.Route{Dst: *pCNet}) - result020s[0].IP4.Routes = routes + result020s, err := resultConvert(results) + if err != nil { + return err } if d.MacVlanMode() { @@ -88,28 +73,8 @@ func cmdAdd(args *skel.CmdArgs) error { _ = utils.SendGratuitousARP(args.IfName, result020s[0].IP4.IP.IP.String(), args.Netns) } else { ifName := args.IfName - ifIndex := 0 - for i := 0; i < len(result020s); i++ { - vlanId := vlanIds[i] - result020 := result020s[i] - bridgeName, err := d.CreateBridgeAndVlanDevice(vlanId) - if err != nil { - return err - } - suffix := "" - if i != 0 { - suffix = fmt.Sprintf("-%d", i+1) - ifIndex++ - args.IfName = fmt.Sprintf("eth%d", ifIndex) - if args.IfName == ifName { - ifIndex++ - args.IfName = fmt.Sprintf("eth%d", ifIndex) - } - } - if err := utils.VethConnectsHostWithContainer(result020, args, bridgeName, suffix); err != nil { - return err - } - _ = utils.SendGratuitousARP(args.IfName, result020s[0].IP4.IP.IP.String(), args.Netns) + if err := setupVlanDevice(result020s, vlanIds, args); err != nil { + return err } args.IfName = ifName } @@ -122,6 +87,57 @@ func cmdAdd(args *skel.CmdArgs) error { return result020s[0].Print() } +func setupVlanDevice(result020s []*t020.Result, vlanIds []uint16, args *skel.CmdArgs) error { + ifName := args.IfName + ifIndex := 0 + for i := 0; i < len(result020s); i++ { + vlanId := vlanIds[i] + result020 := result020s[i] + bridgeName, err := d.CreateBridgeAndVlanDevice(vlanId) + if err != nil { + return err + } + suffix := "" + if i != 0 { + suffix = fmt.Sprintf("-%d", i+1) + ifIndex++ + args.IfName = fmt.Sprintf("eth%d", ifIndex) + if args.IfName == ifName { + ifIndex++ + args.IfName = fmt.Sprintf("eth%d", ifIndex) + } + } + if err := utils.VethConnectsHostWithContainer(result020, args, bridgeName, suffix); err != nil { + return err + } + _ = utils.SendGratuitousARP(args.IfName, result020s[0].IP4.IP.IP.String(), args.Netns) + } + return nil +} + +func resultConvert(results []types.Result) ([]*t020.Result, error) { + var result020s []*t020.Result + for i := 0; i < len(results); i++ { + result020, err := t020.GetResult(results[i]) + if err != nil { + return nil, err + } + result020s = append(result020s, result020) + } + if len(result020s) == 2 { + routes := result020s[0].IP4.Routes + for i := 0; i < len(routes); i++ { + if routes[i].Dst.String() == "0.0.0.0/0" { + routes = append(routes[:i], routes[i+1:]...) + break + } + } + routes = append(routes, types.Route{Dst: *pANet}, types.Route{Dst: *pBNet}, types.Route{Dst: *pCNet}) + result020s[0].IP4.Routes = routes + } + return result020s, nil +} + func cmdDel(args *skel.CmdArgs) error { if err := utils.DeleteAllVeth(args.Netns); err != nil { return err diff --git a/go.mod b/go.mod index 9ab6a0ab..edc45752 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module git.code.oa.com/tkestack/galaxy go 1.13 require ( - git.tencent.com/tke/tapp-controller v0.0.0-20190903090859-2b03d3f3bbca github.com/containernetworking/cni v0.6.0 github.com/containernetworking/plugins v0.6.0 github.com/coreos/go-iptables v0.4.3 @@ -41,6 +40,7 @@ require ( k8s.io/kube-openapi v0.0.0-20190918143330-0270cf2f1c1d // indirect k8s.io/kubernetes v1.16.0-alpha.0 k8s.io/utils v0.0.0-20191010214722-8d271d903fe4 + tkestack.io/tapp-controller v0.0.0-20191017074115-bdf9b4742398 ) replace ( diff --git a/go.sum b/go.sum index b13d22d5..294e9353 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.37.4 h1:glPeL3BQJsbF6aIIYfZizMwc5LTYz250bDMjttbBGAU= cloud.google.com/go v0.37.4/go.mod h1:NHPJ89PdicEuT9hdPXMROBD91xc5uRDxsMtSB16k7hw= -git.tencent.com/tke/tapp-controller v0.0.0-20190903090859-2b03d3f3bbca h1:pTgOqCAdxIYRUqQZLlDrHjIh+HWZV+xvxvQhgPFmb5k= -git.tencent.com/tke/tapp-controller v0.0.0-20190903090859-2b03d3f3bbca/go.mod h1:/fPdei8YS80Q+zX6USM08f5JzP3QKkihr4DLPkgKxxM= github.com/Azure/azure-sdk-for-go v21.4.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= github.com/Azure/go-autorest v11.1.2+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= @@ -530,4 +528,6 @@ sigs.k8s.io/structured-merge-diff v0.0.0-20190302045857-e85c7b244fd2/go.mod h1:w sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod h1:wWxsB5ozmmv/SG7nM11ayaAW51xMvak/t1r0CSlcokI= sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs= sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= +tkestack.io/tapp-controller v0.0.0-20191017074115-bdf9b4742398 h1:vFEijJFTblJHxG9AMVgVtkt8YifkLzQ9Grxuox/+Y7c= +tkestack.io/tapp-controller v0.0.0-20191017074115-bdf9b4742398/go.mod h1:+wwXNBMe6xDAe6geS2UxNvrimW9fMeuh68o9JiA2JOQ= vbom.ml/util v0.0.0-20160121211510-db5cfe13f5cc/go.mod h1:so/NYdZXCz+E3ZpW0uAoCj6uzU2+8OWDFv/HxUSs7kI= diff --git a/pkg/ipam/schedulerplugin/floatingip_plugin_test.go b/pkg/ipam/schedulerplugin/floatingip_plugin_test.go index 020a7aa2..ac569a27 100644 --- a/pkg/ipam/schedulerplugin/floatingip_plugin_test.go +++ b/pkg/ipam/schedulerplugin/floatingip_plugin_test.go @@ -18,8 +18,6 @@ import ( "git.code.oa.com/tkestack/galaxy/pkg/ipam/floatingip" "git.code.oa.com/tkestack/galaxy/pkg/ipam/schedulerplugin/util" "git.code.oa.com/tkestack/galaxy/pkg/utils/database" - fakeTAppCli "git.tencent.com/tke/tapp-controller/pkg/client/clientset/versioned/fake" - tappInformer "git.tencent.com/tke/tapp-controller/pkg/client/informers/externalversions" "github.com/jinzhu/gorm" appv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -31,6 +29,8 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" fakeV1 "k8s.io/client-go/kubernetes/typed/core/v1/fake" + fakeTAppCli "tkestack.io/tapp-controller/pkg/client/clientset/versioned/fake" + tappInformer "tkestack.io/tapp-controller/pkg/client/informers/externalversions" ) const ( diff --git a/pkg/ipam/schedulerplugin/resync.go b/pkg/ipam/schedulerplugin/resync.go index 394562ce..fb9eef65 100644 --- a/pkg/ipam/schedulerplugin/resync.go +++ b/pkg/ipam/schedulerplugin/resync.go @@ -13,13 +13,13 @@ import ( "git.code.oa.com/tkestack/galaxy/pkg/ipam/schedulerplugin/util" "git.code.oa.com/tkestack/galaxy/pkg/utils/database" "git.code.oa.com/tkestack/galaxy/pkg/utils/nets" - tappv1 "git.tencent.com/tke/tapp-controller/pkg/apis/tappcontroller/v1" appv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metaErrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" glog "k8s.io/klog" + tappv1 "tkestack.io/tapp-controller/pkg/apis/tappcontroller/v1" ) func (p *FloatingIPPlugin) storeReady() bool { diff --git a/pkg/ipam/schedulerplugin/types.go b/pkg/ipam/schedulerplugin/types.go index 4c6b80dc..99dfdfc1 100644 --- a/pkg/ipam/schedulerplugin/types.go +++ b/pkg/ipam/schedulerplugin/types.go @@ -3,12 +3,12 @@ package schedulerplugin import ( crd_clientset "git.code.oa.com/tkestack/galaxy/pkg/ipam/client/clientset/versioned" list "git.code.oa.com/tkestack/galaxy/pkg/ipam/client/listers/galaxy/v1alpha1" - "git.tencent.com/tke/tapp-controller/pkg/client/clientset/versioned" - "git.tencent.com/tke/tapp-controller/pkg/client/listers/tappcontroller/v1" extensionClient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/client-go/kubernetes" appv1 "k8s.io/client-go/listers/apps/v1" corev1lister "k8s.io/client-go/listers/core/v1" + "tkestack.io/tapp-controller/pkg/client/clientset/versioned" + "tkestack.io/tapp-controller/pkg/client/listers/tappcontroller/v1" ) type PluginFactoryArgs struct { diff --git a/pkg/ipam/server/server.go b/pkg/ipam/server/server.go index 240acdc5..3babd7ea 100644 --- a/pkg/ipam/server/server.go +++ b/pkg/ipam/server/server.go @@ -19,11 +19,8 @@ import ( "git.code.oa.com/tkestack/galaxy/pkg/ipam/server/options" "git.code.oa.com/tkestack/galaxy/pkg/utils/httputil" pageutil "git.code.oa.com/tkestack/galaxy/pkg/utils/page" - tappVersioned "git.tencent.com/tke/tapp-controller/pkg/client/clientset/versioned" - tappInformers "git.tencent.com/tke/tapp-controller/pkg/client/informers/externalversions" "github.com/emicklei/go-restful" "github.com/emicklei/go-restful-swagger12" - glog "k8s.io/klog" corev1 "k8s.io/api/core/v1" extensionClient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -37,6 +34,9 @@ import ( "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" + glog "k8s.io/klog" + tappVersioned "tkestack.io/tapp-controller/pkg/client/clientset/versioned" + tappInformers "tkestack.io/tapp-controller/pkg/client/informers/externalversions" ) type JsonConf struct { diff --git a/pkg/network/vlan/vlan.go b/pkg/network/vlan/vlan.go index 8cb27ded..5932e6d6 100644 --- a/pkg/network/vlan/vlan.go +++ b/pkg/network/vlan/vlan.go @@ -3,6 +3,7 @@ package vlan import ( "encoding/json" "fmt" + "github.com/vishvananda/netlink/nl" "net" "strings" "sync" @@ -11,7 +12,6 @@ import ( "git.code.oa.com/tkestack/galaxy/pkg/utils" "github.com/containernetworking/cni/pkg/types" "github.com/vishvananda/netlink" - "github.com/vishvananda/netlink/nl" ) const ( @@ -82,13 +82,7 @@ func (d *VlanDriver) Init() error { return nil } if d.PureMode() { - if err := utils.UnSetArpIgnore("all"); err != nil { - return err - } - if err := utils.UnSetArpIgnore(d.Device); err != nil { - return err - } - if err := utils.SetProxyArp(d.Device); err != nil { + if err := d.initPureModeArgs(); err != nil { return err } return utils.EnableNonlocalBind() @@ -110,60 +104,78 @@ func (d *VlanDriver) Init() error { return fmt.Errorf("No available address found on device %s", d.Device) } } else { - bri, err := getOrCreateBridge(d.DefaultBridgeName, device.Attrs().HardwareAddr) - if err != nil { + if err := d.initVlanBridgeDevice(device, filteredAddr); err != nil { return err } - if err := netlink.LinkSetUp(bri); err != nil { - return fmt.Errorf("Failed to set up bridge device %s: %v", d.DefaultBridgeName, err) - } - rs, err := netlink.RouteList(device, nl.FAMILY_V4) + } + return nil +} + +func (d *VlanDriver) initVlanBridgeDevice(device netlink.Link, filteredAddr []netlink.Addr) error { + bri, err := getOrCreateBridge(d.DefaultBridgeName, device.Attrs().HardwareAddr) + if err != nil { + return err + } + if err := netlink.LinkSetUp(bri); err != nil { + return fmt.Errorf("failed to set up bridge device %s: %v", d.DefaultBridgeName, err) + } + rs, err := netlink.RouteList(device, nl.FAMILY_V4) + if err != nil { + return fmt.Errorf("failed to list route of device %s", device.Attrs().Name) + } + defer func() { if err != nil { - return fmt.Errorf("Failed to list route of device %s", device.Attrs().Name) + for i := range rs { + _ = netlink.RouteAdd(&rs[i]) + } + } + }() + for i := range filteredAddr { + if err = netlink.AddrDel(device, &filteredAddr[i]); err != nil { + return fmt.Errorf("failed to remove v4address from device %s: %v", d.Device, err) } + // nolint: errcheck defer func() { if err != nil { - for i := range rs { - _ = netlink.RouteAdd(&rs[i]) - } + netlink.AddrAdd(device, &filteredAddr[i]) } }() - for i := range filteredAddr { - if err = netlink.AddrDel(device, &filteredAddr[i]); err != nil { - return fmt.Errorf("Failed to remove v4address from device %s: %v", d.Device, err) - } - // nolint: errcheck - defer func() { - if err != nil { - netlink.AddrAdd(device, &filteredAddr[i]) - } - }() - filteredAddr[i].Label = "" - if err = netlink.AddrAdd(bri, &filteredAddr[i]); err != nil { - if !strings.Contains(err.Error(), "file exists") { - return fmt.Errorf("Failed to add v4address to bridge device %s: %v, address %v", d.DefaultBridgeName, err, filteredAddr[i]) - } else { - err = nil - } + filteredAddr[i].Label = "" + if err = netlink.AddrAdd(bri, &filteredAddr[i]); err != nil { + if !strings.Contains(err.Error(), "file exists") { + return fmt.Errorf("failed to add v4address to bridge device %s: %v, address %v", d.DefaultBridgeName, err, filteredAddr[i]) + } else { + err = nil } } - if err = netlink.LinkSetMaster(device, &netlink.Bridge{LinkAttrs: netlink.LinkAttrs{Name: d.DefaultBridgeName}}); err != nil { - return fmt.Errorf("Failed to add device %s to bridge device %s: %v", d.Device, d.DefaultBridgeName, err) - } - for i := range rs { - newRoute := netlink.Route{Gw: rs[i].Gw, LinkIndex: bri.Attrs().Index, Dst: rs[i].Dst, Src: rs[i].Src, Scope: rs[i].Scope} - if err = netlink.RouteAdd(&newRoute); err != nil { - if !strings.Contains(err.Error(), "file exists") { - return fmt.Errorf("failed to add route %s", newRoute.String()) - } else { - err = nil - } + } + if err = netlink.LinkSetMaster(device, &netlink.Bridge{LinkAttrs: netlink.LinkAttrs{Name: d.DefaultBridgeName}}); err != nil { + return fmt.Errorf("failed to add device %s to bridge device %s: %v", d.Device, d.DefaultBridgeName, err) + } + for i := range rs { + newRoute := netlink.Route{Gw: rs[i].Gw, LinkIndex: bri.Attrs().Index, Dst: rs[i].Dst, Src: rs[i].Src, Scope: rs[i].Scope} + if err = netlink.RouteAdd(&newRoute); err != nil { + if !strings.Contains(err.Error(), "file exists") { + return fmt.Errorf("failed to add route %s", newRoute.String()) } } } return nil } +func (d *VlanDriver) initPureModeArgs() error { + if err := utils.UnSetArpIgnore("all"); err != nil { + return err + } + if err := utils.UnSetArpIgnore(d.Device); err != nil { + return err + } + if err := utils.SetProxyArp(d.Device); err != nil { + return err + } + return nil +} + func getOrCreateBridge(bridgeName string, mac net.HardwareAddr) (netlink.Link, error) { return getOrCreateDevice(bridgeName, func(name string) error { if err := utils.CreateBridgeDevice(bridgeName, mac); err != nil { diff --git a/pkg/policy/policy.go b/pkg/policy/policy.go index de7f7ba8..375fb0d7 100644 --- a/pkg/policy/policy.go +++ b/pkg/policy/policy.go @@ -14,7 +14,6 @@ import ( "git.code.oa.com/tkestack/galaxy/pkg/api/k8s/eventhandler" "git.code.oa.com/tkestack/galaxy/pkg/utils/ipset" utiliptables "git.code.oa.com/tkestack/galaxy/pkg/utils/iptables" - glog "k8s.io/klog" corev1 "k8s.io/api/core/v1" networkv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -27,6 +26,7 @@ import ( corev1Lister "k8s.io/client-go/listers/core/v1" networkingv1Lister "k8s.io/client-go/listers/networking/v1" "k8s.io/client-go/tools/cache" + glog "k8s.io/klog" utildbus "k8s.io/kubernetes/pkg/util/dbus" utilexec "k8s.io/utils/exec" ) @@ -447,65 +447,11 @@ func (p *PolicyManager) syncRules(polices []policy) error { return fmt.Errorf("failed to list ipsets: %v", err) } // build new ipset table map - newIPSetMap := make(map[string]*ipsetTable) - for _, policy := range polices { - ingress := policy.ingressRule - egress := policy.egressRule - if ingress != nil { - newIPSetMap[ingress.dstIPTable.Name] = ingress.dstIPTable - for _, rule := range ingress.srcRules { - if rule.ipTable != nil { - newIPSetMap[rule.ipTable.Name] = rule.ipTable - } - if rule.netTable != nil { - newIPSetMap[rule.netTable.Name] = rule.netTable - } - } - } - if egress != nil { - newIPSetMap[egress.srcIPTable.Name] = egress.srcIPTable - for _, rule := range egress.dstRules { - if rule.ipTable != nil { - newIPSetMap[rule.ipTable.Name] = rule.ipTable - } - if rule.netTable != nil { - newIPSetMap[rule.netTable.Name] = rule.netTable - } - } - } - } + newIPSetMap := initIPSetMap(polices) + // create ipset - for name, set := range newIPSetMap { - if err := p.ipsetHandle.CreateSet(&set.IPSet, true); err != nil { - return fmt.Errorf("failed to create ipset %s %s: %v", set.Name, string(set.SetType), err) - } - oldEntries, err := p.ipsetHandle.ListEntries(name) - if err != nil { - glog.Warningf("failed to list entries %s: %v", name, err) - continue - } - oldEntriesSet := sets.NewString(oldEntries...) - newEntries := sets.NewString() - for _, entry := range set.entries { - newEntryStr := strings.Join(append([]string{entry.String()}, entry.Options...), " ") - newEntries.Insert(newEntryStr) - if oldEntriesSet.Has(newEntryStr) { - continue - } - if err := p.ipsetHandle.AddEntryWithOptions(&entry, &set.IPSet, true); err != nil { - glog.Warningf("failed to add entry %v: %v", entry, err) - } - } - glog.V(5).Infof("old entries %s, new entries %s", strings.Join(oldEntries, ","), strings.Join(newEntries.List(), ",")) - // clean up stale entries - for _, old := range oldEntries { - if !newEntries.Has(old) { - parts := strings.Split(old, " ") - if err := p.ipsetHandle.DelEntryWithOptions(name, parts[0], parts[1:]...); err != nil { - glog.Warningf("failed to del entry %s from set %s: %v", old, name, err) - } - } - } + if err := p.createIPSet(newIPSetMap); err != nil { + return err } // nolint: errcheck defer func() { @@ -521,6 +467,10 @@ func (p *PolicyManager) syncRules(polices []policy) error { }() // sync iptables + return p.syncIptables(polices) +} + +func (p *PolicyManager) syncIptables(polices []policy) error { iptablesSaveRaw := bytes.NewBuffer(nil) // Get iptables-save output so we can check for existing chains and rules. // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore @@ -591,13 +541,80 @@ func (p *PolicyManager) syncRules(polices []policy) error { writeLine(filterRules, "COMMIT") lines := append(filterChains.Bytes(), filterRules.Bytes()...) - err = p.iptableHandle.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) + err := p.iptableHandle.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) if err != nil { return fmt.Errorf("failed to execute iptables-restore for ruls %s: %v", string(lines), err) } return nil } +func initIPSetMap(polices []policy) map[string]*ipsetTable { + newIPSetMap := make(map[string]*ipsetTable) + for _, policy := range polices { + ingress := policy.ingressRule + egress := policy.egressRule + if ingress != nil { + newIPSetMap[ingress.dstIPTable.Name] = ingress.dstIPTable + for _, rule := range ingress.srcRules { + if rule.ipTable != nil { + newIPSetMap[rule.ipTable.Name] = rule.ipTable + } + if rule.netTable != nil { + newIPSetMap[rule.netTable.Name] = rule.netTable + } + } + } + if egress != nil { + newIPSetMap[egress.srcIPTable.Name] = egress.srcIPTable + for _, rule := range egress.dstRules { + if rule.ipTable != nil { + newIPSetMap[rule.ipTable.Name] = rule.ipTable + } + if rule.netTable != nil { + newIPSetMap[rule.netTable.Name] = rule.netTable + } + } + } + } + return newIPSetMap +} + +func (p *PolicyManager) createIPSet(newIPSetMap map[string]*ipsetTable) error { + for name, set := range newIPSetMap { + if err := p.ipsetHandle.CreateSet(&set.IPSet, true); err != nil { + return fmt.Errorf("failed to create ipset %s %s: %v", set.Name, string(set.SetType), err) + } + oldEntries, err := p.ipsetHandle.ListEntries(name) + if err != nil { + glog.Warningf("failed to list entries %s: %v", name, err) + continue + } + oldEntriesSet := sets.NewString(oldEntries...) + newEntries := sets.NewString() + for _, entry := range set.entries { + newEntryStr := strings.Join(append([]string{entry.String()}, entry.Options...), " ") + newEntries.Insert(newEntryStr) + if oldEntriesSet.Has(newEntryStr) { + continue + } + if err := p.ipsetHandle.AddEntryWithOptions(&entry, &set.IPSet, true); err != nil { + glog.Warningf("failed to add entry %v: %v", entry, err) + } + } + glog.V(5).Infof("old entries %s, new entries %s", strings.Join(oldEntries, ","), strings.Join(newEntries.List(), ",")) + // clean up stale entries + for _, old := range oldEntries { + if !newEntries.Has(old) { + parts := strings.Split(old, " ") + if err := p.ipsetHandle.DelEntryWithOptions(name, parts[0], parts[1:]...); err != nil { + glog.Warningf("failed to del entry %s from set %s: %v", old, name, err) + } + } + } + } + return nil +} + // The rule maybe // -A GLX-PLCY-XXXX -m comment --comment "name_namespace -p tcp \ // -m set --match-set GLX-sip-xxxx src \