From 0c8a3e27ca0a5c0a4eb2d8937e36b84d16305c94 Mon Sep 17 00:00:00 2001 From: Francesco Cheinasso Date: Mon, 4 Sep 2023 13:36:10 +0200 Subject: [PATCH] Liqo-route: liqo.host forward --- cmd/liqonet/gateway-operator.go | 8 +- internal/liqonet/tunnel-operator/firewall.go | 95 +++++++++++++++++++ .../tunnel-operator/tunnel-operator.go | 40 ++++---- .../tunnel_operator_suite_test.go | 5 +- .../tunnel-operator/tunnel_operator_test.go | 11 ++- 5 files changed, 133 insertions(+), 26 deletions(-) create mode 100644 internal/liqonet/tunnel-operator/firewall.go diff --git a/cmd/liqonet/gateway-operator.go b/cmd/liqonet/gateway-operator.go index 8f7f8f5829..468574e893 100644 --- a/cmd/liqonet/gateway-operator.go +++ b/cmd/liqonet/gateway-operator.go @@ -15,6 +15,7 @@ package main import ( + "context" "flag" "os" "sync" @@ -37,6 +38,7 @@ import ( liqonetns "github.com/liqotech/liqo/pkg/liqonet/netns" liqonetutils "github.com/liqotech/liqo/pkg/liqonet/utils" "github.com/liqotech/liqo/pkg/liqonet/utils/links" + liqonetsignals "github.com/liqotech/liqo/pkg/liqonet/utils/signals" "github.com/liqotech/liqo/pkg/utils/mapper" "github.com/liqotech/liqo/pkg/utils/restcfg" ) @@ -73,6 +75,8 @@ func addGatewayOperatorFlags(liqonet *gatewayOperatorFlags) { } func runGatewayOperator(commonFlags *liqonetCommonFlags, gatewayFlags *gatewayOperatorFlags) { + ctx, _ := liqonetsignals.NotifyContextPosix(context.Background(), liqonetsignals.ShutdownSignals...) + wg := sync.WaitGroup{} metricsAddr := commonFlags.metricsAddr enableLeaderElection := gatewayFlags.enableLeaderElection leaseDuration := gatewayFlags.leaseDuration @@ -152,7 +156,7 @@ func runGatewayOperator(commonFlags *liqonetCommonFlags, gatewayFlags *gatewayOp klog.Errorf("unable to setup labeler controller: %s", err) os.Exit(1) } - tunnelController, err := tunneloperator.NewTunnelController(podIP.String(), podNamespace, eventRecorder, + tunnelController, err := tunneloperator.NewTunnelController(ctx, &wg, podIP.String(), podNamespace, eventRecorder, clientset, main.GetClient(), &readyClustersMutex, readyClusters, gatewayNetns, hostNetns, int(MTU), int(port), updateStatusInterval) // If something goes wrong while creating and configuring the tunnel controller // then make sure that we remove all the resources created during the create process. @@ -184,7 +188,7 @@ func runGatewayOperator(commonFlags *liqonetCommonFlags, gatewayFlags *gatewayOp } klog.Info("Starting manager as Tunnel-Operator") - if err := main.Start(tunnelController.SetupSignalHandlerForTunnelOperator()); err != nil { + if err := main.Start(tunnelController.SetupSignalHandlerForTunnelOperator(ctx, &wg)); err != nil { klog.Errorf("unable to start tunnel controller: %s", err) os.Exit(1) } diff --git a/internal/liqonet/tunnel-operator/firewall.go b/internal/liqonet/tunnel-operator/firewall.go new file mode 100644 index 0000000000..2798ef2f28 --- /dev/null +++ b/internal/liqonet/tunnel-operator/firewall.go @@ -0,0 +1,95 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tunneloperator + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/coreos/go-iptables/iptables" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + + liqonetsignals "github.com/liqotech/liqo/pkg/liqonet/utils/signals" +) + +const ( + filterTable = "filter" +) + +type firewallRule struct { + table string + chain string + rule []string +} + +func (fr *firewallRule) String() string { + ruleString := strings.Join(fr.rule, " ") + return strings.Join([]string{fr.table, fr.chain, ruleString}, " ") +} + +func enforceFirewallRules(ctx context.Context, wg *sync.WaitGroup, ipt *iptables.IPTables, ifaceName string) { + wg.Add(1) + defer wg.Done() + err := wait.PollUntilContextCancel(ctx, 5*time.Second, false, func(ctx context.Context) (done bool, err error) { + rules := generateForwardingRules(ifaceName) + for i := range rules { + if err := addRule(ipt, &rules[i]); err != nil { + return false, err + } + } + return false, nil + }) + if err != nil && ctx.Err() == nil { + klog.Errorf("Unable to enforce firewall rules: %v", err) + utilruntime.Must(liqonetsignals.Shutdown()) + } +} + +// generateRules generates the firewall rules for the given overlay interface. +func generateForwardingRules(ifaceName string) []firewallRule { + comment := fmt.Sprintf("LIQO accept traffic from/to overlay interface %s", ifaceName) + return []firewallRule{ + { + table: filterTable, + chain: "INPUT", + rule: []string{"-i", ifaceName, "-j", "ACCEPT", "-m", "comment", "--comment", comment}, + }, + { + table: filterTable, + chain: "FORWARD", + rule: []string{"-i", ifaceName, "-j", "ACCEPT", "-m", "comment", "--comment", comment}, + }, + { + table: filterTable, + chain: "OUTPUT", + rule: []string{"-o", ifaceName, "-j", "ACCEPT", "-m", "comment", "--comment", comment}, + }, + } +} + +// addRule appends the rule if it does not exist. +func addRule(ipt *iptables.IPTables, rule *firewallRule) error { + return ipt.AppendUnique(rule.table, rule.chain, rule.rule...) +} + +// deleteRule removes the rule if it exists. +func deleteRule(ipt *iptables.IPTables, rule *firewallRule) error { + return ipt.DeleteIfExists(rule.table, rule.chain, rule.rule...) +} diff --git a/internal/liqonet/tunnel-operator/tunnel-operator.go b/internal/liqonet/tunnel-operator/tunnel-operator.go index 41ad89ea7d..c8347a7724 100644 --- a/internal/liqonet/tunnel-operator/tunnel-operator.go +++ b/internal/liqonet/tunnel-operator/tunnel-operator.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "net" - "os" "reflect" "strings" "sync" @@ -49,7 +48,6 @@ import ( "github.com/liqotech/liqo/pkg/liqonet/tunnel" tunnelwg "github.com/liqotech/liqo/pkg/liqonet/tunnel/wireguard" liqonetutils "github.com/liqotech/liqo/pkg/liqonet/utils" - liqonetsignals "github.com/liqotech/liqo/pkg/liqonet/utils/signals" liqolabels "github.com/liqotech/liqo/pkg/utils/labels" ) @@ -84,7 +82,8 @@ type TunnelController struct { // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch // NewTunnelController instantiates and initializes the tunnel controller. -func NewTunnelController(podIP, namespace string, er record.EventRecorder, k8sClient k8s.Interface, cl client.Client, +func NewTunnelController(ctx context.Context, wg *sync.WaitGroup, + podIP, namespace string, er record.EventRecorder, k8sClient k8s.Interface, cl client.Client, readyClustersMutex *sync.Mutex, readyClusters map[string]struct{}, gatewayNetns, hostNetns ns.NetNS, mtu, port int, updateStatusInterval time.Duration) (*TunnelController, error) { tunnelEndpointFinalizer := liqoconst.LiqoGatewayOperatorName + "." + liqoconst.FinalizersSuffix @@ -113,7 +112,7 @@ func NewTunnelController(podIP, namespace string, er record.EventRecorder, k8sCl if err != nil { return nil, fmt.Errorf("failed to retrieve tunnel iface from host netns: %w", err) } - if err = tc.setUpGWNetns(liqoconst.HostVethName, liqoconst.GatewayVethName, mtu); err != nil { + if err = tc.setUpGWNetns(ctx, wg, liqoconst.HostVethName, liqoconst.GatewayVethName, mtu); err != nil { return nil, fmt.Errorf("failed to setup gateway netns: %w", err) } // Move wireguard interface in the gateway network namespace. @@ -416,18 +415,16 @@ func (tc *TunnelController) EnsureIPTablesRulesPerCluster(tep *netv1alpha1.Tunne // SetupSignalHandlerForTunnelOperator registers for SIGTERM, SIGINT, SIGKILL. A context is returned // which is closed on one of these signals. -func (tc *TunnelController) SetupSignalHandlerForTunnelOperator() context.Context { - ctx, done := context.WithCancel(context.Background()) - c := make(chan os.Signal, 1) - liqonetsignals.NotifyPosix(c, liqonetsignals.ShutdownSignals...) +func (tc *TunnelController) SetupSignalHandlerForTunnelOperator(ctx context.Context, wg *sync.WaitGroup) context.Context { + opctx, done := context.WithCancel(context.Background()) go func(tc *TunnelController) { - sig := <-c - klog.Infof("the operator received signal {%s}: cleaning up", sig.String()) - // Here, the error is not checked, as at exit time is not possible to recover. Errors are just logged. + <-ctx.Done() + klog.Info("the operator received a shutdown signal: cleaning up") + wg.Wait() tc.CleanUpConfiguration(liqoconst.GatewayNetnsName) done() }(tc) - return ctx + return opctx } // SetupWithManager configures the current controller to be managed by the given manager. @@ -496,7 +493,7 @@ func (tc *TunnelController) SetUpRouteManager() error { return nil } -func (tc *TunnelController) setUpGWNetns(hostVethName, gatewayVethName string, vethMtu int) error { +func (tc *TunnelController) setUpGWNetns(ctx context.Context, wg *sync.WaitGroup, hostVethName, gatewayVethName string, vethMtu int) error { // Create veth pair to connect the two namespaces. hostVeth, gatewayVeth, err := liqonetns.CreateVethPair(hostVethName, gatewayVethName, tc.hostNetns, tc.gatewayNetns, vethMtu) if err != nil { @@ -520,17 +517,16 @@ func (tc *TunnelController) setUpGWNetns(hostVethName, gatewayVethName string, v return err } + // Configure forwarding rule from hostveth to vxlan. + go enforceFirewallRules(ctx, wg, &tc.Ipt, hostVeth.Name) + // Configure the static neighbor entries, and subscribe to the events to perform updates in case the veth MAC address changes. return liqonetns.RegisterOnVethHwAddrChangeHandler(tc.hostNetns, hostVethName, func(hwaddr net.HardwareAddr) error { if err := liqonetns.ConfigureVethNeigh(&hostVeth, liqoconst.GatewayVethIPAddr, gatewayVeth.HardwareAddr, tc.hostNetns); err != nil { return err } - if err := liqonetns.ConfigureVethNeigh(&gatewayVeth, liqoconst.HostVethIPAddr, hwaddr, tc.gatewayNetns); err != nil { - return err - } - - return nil + return liqonetns.ConfigureVethNeigh(&gatewayVeth, liqoconst.HostVethIPAddr, hwaddr, tc.gatewayNetns) }) } @@ -589,6 +585,14 @@ func (tc *TunnelController) cleanupRouteFinalizers(ctx context.Context, tep *net func (tc *TunnelController) CleanUpConfiguration(netnsName string) { klog.Infof("cleaning up...") + klog.V(4).Infof("deleting iptables rules for cluster {%s}", tc.namespace) + fwRules := generateForwardingRules(tc.hostVeth.Name) + for i := range fwRules { + if err := deleteRule(&tc.Ipt, &fwRules[i]); err != nil { + klog.Errorf("an error occurred while deleting iptables rule {%s}: %v", &fwRules[i], err) + } + } + klog.V(4).Infof("deleting neigh entry with mac {%s} and dst {%s} on device {%s}", tc.gatewayVeth.HardwareAddr.String(), liqoconst.GatewayVethIPAddr, tc.hostVeth.Name) if _, err := liqonetns.DelNeigh(net.ParseIP(liqoconst.GatewayVethIPAddr), tc.gatewayVeth.HardwareAddr, &tc.hostVeth); err != nil { diff --git a/internal/liqonet/tunnel-operator/tunnel_operator_suite_test.go b/internal/liqonet/tunnel-operator/tunnel_operator_suite_test.go index 3b62e7b79f..43e7109cdf 100644 --- a/internal/liqonet/tunnel-operator/tunnel_operator_suite_test.go +++ b/internal/liqonet/tunnel-operator/tunnel_operator_suite_test.go @@ -205,10 +205,7 @@ func initIPTables() error { if err := ipt.EnsureChainRulesPerCluster(tep1); err != nil { return err } - if err := ipt.EnsureChainRulesPerCluster(tep2); err != nil { - return err - } - return nil + return ipt.EnsureChainRulesPerCluster(tep2) }) return err } diff --git a/internal/liqonet/tunnel-operator/tunnel_operator_test.go b/internal/liqonet/tunnel-operator/tunnel_operator_test.go index 7caae77fc1..72030c171a 100644 --- a/internal/liqonet/tunnel-operator/tunnel_operator_test.go +++ b/internal/liqonet/tunnel-operator/tunnel_operator_test.go @@ -15,6 +15,8 @@ package tunneloperator import ( + "sync" + "github.com/containernetworking/plugins/pkg/ns" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -23,6 +25,8 @@ import ( liqoconst "github.com/liqotech/liqo/pkg/consts" ) +var wg *sync.WaitGroup + var _ = Describe("TunnelOperator", func() { Describe("setup gateway namespace", func() { Context("configuring the new gateway namespace", func() { @@ -38,8 +42,11 @@ var _ = Describe("TunnelOperator", func() { Expect(netlink.LinkDel(link)).ShouldNot(HaveOccurred()) } }) + BeforeEach(func() { + wg = &sync.WaitGroup{} + }) It("should return nil", func() { - err := tc.setUpGWNetns(liqoconst.HostVethName, liqoconst.GatewayVethName, 1420) + err := tc.setUpGWNetns(ctx, wg, liqoconst.HostVethName, liqoconst.GatewayVethName, 1420) Expect(err).ShouldNot(HaveOccurred()) // Check that we have the veth interface in host namespace err = tc.hostNetns.Do(func(ns ns.NetNS) error { @@ -62,7 +69,7 @@ var _ = Describe("TunnelOperator", func() { }) It("incorrect name for veth interface, should return error", func() { - err := tc.setUpGWNetns("", liqoconst.GatewayVethName, 1420) + err := tc.setUpGWNetns(ctx, wg, "", liqoconst.GatewayVethName, 1420) Expect(err).Should(HaveOccurred()) Expect(err).Should(MatchError("an error occurred while creating veth pair between host and gateway namespace: " + "failed to make veth pair: LinkAttrs.Name cannot be empty"))