Skip to content

Commit

Permalink
Liqo-route: liqo.host forward
Browse files Browse the repository at this point in the history
  • Loading branch information
cheina97 authored and adamjensenbot committed Sep 7, 2023
1 parent 459f2e3 commit 0c8a3e2
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 26 deletions.
8 changes: 6 additions & 2 deletions cmd/liqonet/gateway-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"context"
"flag"
"os"
"sync"
Expand All @@ -37,6 +38,7 @@ import (
liqonetns "github.com/liqotech/liqo/pkg/liqonet/netns"
liqonetutils "github.com/liqotech/liqo/pkg/liqonet/utils"
"github.com/liqotech/liqo/pkg/liqonet/utils/links"
liqonetsignals "github.com/liqotech/liqo/pkg/liqonet/utils/signals"
"github.com/liqotech/liqo/pkg/utils/mapper"
"github.com/liqotech/liqo/pkg/utils/restcfg"
)
Expand Down Expand Up @@ -73,6 +75,8 @@ func addGatewayOperatorFlags(liqonet *gatewayOperatorFlags) {
}

func runGatewayOperator(commonFlags *liqonetCommonFlags, gatewayFlags *gatewayOperatorFlags) {
ctx, _ := liqonetsignals.NotifyContextPosix(context.Background(), liqonetsignals.ShutdownSignals...)
wg := sync.WaitGroup{}
metricsAddr := commonFlags.metricsAddr
enableLeaderElection := gatewayFlags.enableLeaderElection
leaseDuration := gatewayFlags.leaseDuration
Expand Down Expand Up @@ -152,7 +156,7 @@ func runGatewayOperator(commonFlags *liqonetCommonFlags, gatewayFlags *gatewayOp
klog.Errorf("unable to setup labeler controller: %s", err)
os.Exit(1)
}
tunnelController, err := tunneloperator.NewTunnelController(podIP.String(), podNamespace, eventRecorder,
tunnelController, err := tunneloperator.NewTunnelController(ctx, &wg, podIP.String(), podNamespace, eventRecorder,
clientset, main.GetClient(), &readyClustersMutex, readyClusters, gatewayNetns, hostNetns, int(MTU), int(port), updateStatusInterval)
// If something goes wrong while creating and configuring the tunnel controller
// then make sure that we remove all the resources created during the create process.
Expand Down Expand Up @@ -184,7 +188,7 @@ func runGatewayOperator(commonFlags *liqonetCommonFlags, gatewayFlags *gatewayOp
}

klog.Info("Starting manager as Tunnel-Operator")
if err := main.Start(tunnelController.SetupSignalHandlerForTunnelOperator()); err != nil {
if err := main.Start(tunnelController.SetupSignalHandlerForTunnelOperator(ctx, &wg)); err != nil {
klog.Errorf("unable to start tunnel controller: %s", err)
os.Exit(1)
}
Expand Down
95 changes: 95 additions & 0 deletions internal/liqonet/tunnel-operator/firewall.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2019-2023 The Liqo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tunneloperator

import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/coreos/go-iptables/iptables"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

liqonetsignals "github.com/liqotech/liqo/pkg/liqonet/utils/signals"
)

const (
filterTable = "filter"
)

type firewallRule struct {
table string
chain string
rule []string
}

func (fr *firewallRule) String() string {
ruleString := strings.Join(fr.rule, " ")
return strings.Join([]string{fr.table, fr.chain, ruleString}, " ")
}

func enforceFirewallRules(ctx context.Context, wg *sync.WaitGroup, ipt *iptables.IPTables, ifaceName string) {
wg.Add(1)
defer wg.Done()
err := wait.PollUntilContextCancel(ctx, 5*time.Second, false, func(ctx context.Context) (done bool, err error) {
rules := generateForwardingRules(ifaceName)
for i := range rules {
if err := addRule(ipt, &rules[i]); err != nil {
return false, err
}
}
return false, nil
})
if err != nil && ctx.Err() == nil {
klog.Errorf("Unable to enforce firewall rules: %v", err)
utilruntime.Must(liqonetsignals.Shutdown())
}
}

// generateRules generates the firewall rules for the given overlay interface.
func generateForwardingRules(ifaceName string) []firewallRule {
comment := fmt.Sprintf("LIQO accept traffic from/to overlay interface %s", ifaceName)
return []firewallRule{
{
table: filterTable,
chain: "INPUT",
rule: []string{"-i", ifaceName, "-j", "ACCEPT", "-m", "comment", "--comment", comment},
},
{
table: filterTable,
chain: "FORWARD",
rule: []string{"-i", ifaceName, "-j", "ACCEPT", "-m", "comment", "--comment", comment},
},
{
table: filterTable,
chain: "OUTPUT",
rule: []string{"-o", ifaceName, "-j", "ACCEPT", "-m", "comment", "--comment", comment},
},
}
}

// addRule appends the rule if it does not exist.
func addRule(ipt *iptables.IPTables, rule *firewallRule) error {
return ipt.AppendUnique(rule.table, rule.chain, rule.rule...)
}

// deleteRule removes the rule if it exists.
func deleteRule(ipt *iptables.IPTables, rule *firewallRule) error {
return ipt.DeleteIfExists(rule.table, rule.chain, rule.rule...)
}
40 changes: 22 additions & 18 deletions internal/liqonet/tunnel-operator/tunnel-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"net"
"os"
"reflect"
"strings"
"sync"
Expand Down Expand Up @@ -49,7 +48,6 @@ import (
"github.com/liqotech/liqo/pkg/liqonet/tunnel"
tunnelwg "github.com/liqotech/liqo/pkg/liqonet/tunnel/wireguard"
liqonetutils "github.com/liqotech/liqo/pkg/liqonet/utils"
liqonetsignals "github.com/liqotech/liqo/pkg/liqonet/utils/signals"
liqolabels "github.com/liqotech/liqo/pkg/utils/labels"
)

Expand Down Expand Up @@ -84,7 +82,8 @@ type TunnelController struct {
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch

// NewTunnelController instantiates and initializes the tunnel controller.
func NewTunnelController(podIP, namespace string, er record.EventRecorder, k8sClient k8s.Interface, cl client.Client,
func NewTunnelController(ctx context.Context, wg *sync.WaitGroup,
podIP, namespace string, er record.EventRecorder, k8sClient k8s.Interface, cl client.Client,
readyClustersMutex *sync.Mutex, readyClusters map[string]struct{}, gatewayNetns, hostNetns ns.NetNS, mtu, port int,
updateStatusInterval time.Duration) (*TunnelController, error) {
tunnelEndpointFinalizer := liqoconst.LiqoGatewayOperatorName + "." + liqoconst.FinalizersSuffix
Expand Down Expand Up @@ -113,7 +112,7 @@ func NewTunnelController(podIP, namespace string, er record.EventRecorder, k8sCl
if err != nil {
return nil, fmt.Errorf("failed to retrieve tunnel iface from host netns: %w", err)
}
if err = tc.setUpGWNetns(liqoconst.HostVethName, liqoconst.GatewayVethName, mtu); err != nil {
if err = tc.setUpGWNetns(ctx, wg, liqoconst.HostVethName, liqoconst.GatewayVethName, mtu); err != nil {
return nil, fmt.Errorf("failed to setup gateway netns: %w", err)
}
// Move wireguard interface in the gateway network namespace.
Expand Down Expand Up @@ -416,18 +415,16 @@ func (tc *TunnelController) EnsureIPTablesRulesPerCluster(tep *netv1alpha1.Tunne

// SetupSignalHandlerForTunnelOperator registers for SIGTERM, SIGINT, SIGKILL. A context is returned
// which is closed on one of these signals.
func (tc *TunnelController) SetupSignalHandlerForTunnelOperator() context.Context {
ctx, done := context.WithCancel(context.Background())
c := make(chan os.Signal, 1)
liqonetsignals.NotifyPosix(c, liqonetsignals.ShutdownSignals...)
func (tc *TunnelController) SetupSignalHandlerForTunnelOperator(ctx context.Context, wg *sync.WaitGroup) context.Context {
opctx, done := context.WithCancel(context.Background())
go func(tc *TunnelController) {
sig := <-c
klog.Infof("the operator received signal {%s}: cleaning up", sig.String())
// Here, the error is not checked, as at exit time is not possible to recover. Errors are just logged.
<-ctx.Done()
klog.Info("the operator received a shutdown signal: cleaning up")
wg.Wait()
tc.CleanUpConfiguration(liqoconst.GatewayNetnsName)
done()
}(tc)
return ctx
return opctx
}

// SetupWithManager configures the current controller to be managed by the given manager.
Expand Down Expand Up @@ -496,7 +493,7 @@ func (tc *TunnelController) SetUpRouteManager() error {
return nil
}

func (tc *TunnelController) setUpGWNetns(hostVethName, gatewayVethName string, vethMtu int) error {
func (tc *TunnelController) setUpGWNetns(ctx context.Context, wg *sync.WaitGroup, hostVethName, gatewayVethName string, vethMtu int) error {
// Create veth pair to connect the two namespaces.
hostVeth, gatewayVeth, err := liqonetns.CreateVethPair(hostVethName, gatewayVethName, tc.hostNetns, tc.gatewayNetns, vethMtu)
if err != nil {
Expand All @@ -520,17 +517,16 @@ func (tc *TunnelController) setUpGWNetns(hostVethName, gatewayVethName string, v
return err
}

// Configure forwarding rule from hostveth to vxlan.
go enforceFirewallRules(ctx, wg, &tc.Ipt, hostVeth.Name)

// Configure the static neighbor entries, and subscribe to the events to perform updates in case the veth MAC address changes.
return liqonetns.RegisterOnVethHwAddrChangeHandler(tc.hostNetns, hostVethName, func(hwaddr net.HardwareAddr) error {
if err := liqonetns.ConfigureVethNeigh(&hostVeth, liqoconst.GatewayVethIPAddr, gatewayVeth.HardwareAddr, tc.hostNetns); err != nil {
return err
}

if err := liqonetns.ConfigureVethNeigh(&gatewayVeth, liqoconst.HostVethIPAddr, hwaddr, tc.gatewayNetns); err != nil {
return err
}

return nil
return liqonetns.ConfigureVethNeigh(&gatewayVeth, liqoconst.HostVethIPAddr, hwaddr, tc.gatewayNetns)
})
}

Expand Down Expand Up @@ -589,6 +585,14 @@ func (tc *TunnelController) cleanupRouteFinalizers(ctx context.Context, tep *net
func (tc *TunnelController) CleanUpConfiguration(netnsName string) {
klog.Infof("cleaning up...")

klog.V(4).Infof("deleting iptables rules for cluster {%s}", tc.namespace)
fwRules := generateForwardingRules(tc.hostVeth.Name)
for i := range fwRules {
if err := deleteRule(&tc.Ipt, &fwRules[i]); err != nil {
klog.Errorf("an error occurred while deleting iptables rule {%s}: %v", &fwRules[i], err)
}
}

klog.V(4).Infof("deleting neigh entry with mac {%s} and dst {%s} on device {%s}",
tc.gatewayVeth.HardwareAddr.String(), liqoconst.GatewayVethIPAddr, tc.hostVeth.Name)
if _, err := liqonetns.DelNeigh(net.ParseIP(liqoconst.GatewayVethIPAddr), tc.gatewayVeth.HardwareAddr, &tc.hostVeth); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,7 @@ func initIPTables() error {
if err := ipt.EnsureChainRulesPerCluster(tep1); err != nil {
return err
}
if err := ipt.EnsureChainRulesPerCluster(tep2); err != nil {
return err
}
return nil
return ipt.EnsureChainRulesPerCluster(tep2)
})
return err
}
11 changes: 9 additions & 2 deletions internal/liqonet/tunnel-operator/tunnel_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package tunneloperator

import (
"sync"

"github.com/containernetworking/plugins/pkg/ns"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand All @@ -23,6 +25,8 @@ import (
liqoconst "github.com/liqotech/liqo/pkg/consts"
)

var wg *sync.WaitGroup

var _ = Describe("TunnelOperator", func() {
Describe("setup gateway namespace", func() {
Context("configuring the new gateway namespace", func() {
Expand All @@ -38,8 +42,11 @@ var _ = Describe("TunnelOperator", func() {
Expect(netlink.LinkDel(link)).ShouldNot(HaveOccurred())
}
})
BeforeEach(func() {
wg = &sync.WaitGroup{}
})
It("should return nil", func() {
err := tc.setUpGWNetns(liqoconst.HostVethName, liqoconst.GatewayVethName, 1420)
err := tc.setUpGWNetns(ctx, wg, liqoconst.HostVethName, liqoconst.GatewayVethName, 1420)
Expect(err).ShouldNot(HaveOccurred())
// Check that we have the veth interface in host namespace
err = tc.hostNetns.Do(func(ns ns.NetNS) error {
Expand All @@ -62,7 +69,7 @@ var _ = Describe("TunnelOperator", func() {
})

It("incorrect name for veth interface, should return error", func() {
err := tc.setUpGWNetns("", liqoconst.GatewayVethName, 1420)
err := tc.setUpGWNetns(ctx, wg, "", liqoconst.GatewayVethName, 1420)
Expect(err).Should(HaveOccurred())
Expect(err).Should(MatchError("an error occurred while creating veth pair between host and gateway namespace: " +
"failed to make veth pair: LinkAttrs.Name cannot be empty"))
Expand Down

0 comments on commit 0c8a3e2

Please sign in to comment.