Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fixing and enhancement network operators #268

Merged
merged 1 commit into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/liqonet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"strconv"
"strings"
"time"
// +kubebuilder:scaffold:imports
)
Expand Down Expand Up @@ -135,12 +136,11 @@ func main() {
if err != nil {
setupLog.Error(err, "unable to initialize iptables: %v. check if the ipatable are present in the system", err)
}

r := &liqonetOperators.RouteController{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("route-operator"),
Scheme: mgr.GetScheme(),
RouteOperator: runAsRouteOperator,
Recorder: mgr.GetEventRecorderFor(strings.Join([]string{"route-OP", nodeName}, "-")),
ClientSet: clientset,
RoutesPerRemoteCluster: make(map[string][]netlink.Route),
IsGateway: isGatewayNode,
Expand Down Expand Up @@ -178,6 +178,7 @@ func main() {
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("liqonetOperators").WithName("TunnelEndpoint"),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("tunnel-operator"),
TunnelIFacesPerRemoteCluster: make(map[string]int),
}
if err = r.SetupWithManager(mgr); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ rules:
- get
- patch
- update
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down
33 changes: 28 additions & 5 deletions internal/liqonet/route-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
k8sApiErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"net"
"os"
"os/signal"
Expand Down Expand Up @@ -54,7 +55,7 @@ type RouteController struct {
Log logr.Logger
Scheme *runtime.Scheme
clientset kubernetes.Clientset
RouteOperator bool
Recorder record.EventRecorder
NodeName string
ClientSet *kubernetes.Clientset
RemoteVTEPs []string
Expand Down Expand Up @@ -96,7 +97,14 @@ func (r *RouteController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
r.Log.Error(err, "unable to fetch endpoint")
return ctrl.Result{}, client.IgnoreNotFound(err)
}

//we can process a tunnelendpoint resource only if the tunnel interface has been created and configured
//this network interface is managed by the tunnel operator who sets this field when process the same tunnelendpoint
//resource and creates the network interface.
//if it has not been created yet than we return and wait for the resource to be processed by the tunnel operator.
if endpoint.Status.TunnelIFaceName == "" {
log.Info("the tunnel network interface is not ready")
return ctrl.Result{RequeueAfter: r.RetryTimeout}, nil
}
// examine DeletionTimestamp to determine if object is under deletion
if endpoint.ObjectMeta.DeletionTimestamp.IsZero() {
if !liqonetOperator.ContainsString(endpoint.ObjectMeta.Finalizers, routeOperatorFinalizer) {
Expand All @@ -120,13 +128,18 @@ func (r *RouteController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
}
} else {
//the object is being deleted
//if we encounter an error while removing iptables rules or the routes than we record an
//event on the resource to notify the user
//the finalizer is not removed
if liqonetOperator.ContainsString(endpoint.Finalizers, routeOperatorFinalizer) {
if err := r.deleteIPTablesRulespecForRemoteCluster(&endpoint); err != nil {
r.Log.Error(err, "error while deleting rulespec from iptables")
r.Recorder.Event(&endpoint, "Warning", "Processing", err.Error())
return ctrl.Result{RequeueAfter: r.RetryTimeout}, err
}
if err := r.deleteRoutesPerCluster(&endpoint); err != nil {
r.Log.Error(err, "error while deleting routes")
r.Recorder.Event(&endpoint, "Warning", "Processing", err.Error())
return ctrl.Result{RequeueAfter: r.RetryTimeout}, err
}
//remove the finalizer from the list and update it.
Expand All @@ -144,16 +157,26 @@ func (r *RouteController) Reconcile(req ctrl.Request) (ctrl.Result, error) {

if err := r.createAndInsertIPTablesChains(); err != nil {
r.Log.Error(err, "unable to create iptables chains")
r.Recorder.Event(&endpoint, "Warning", "Processing", err.Error())
return ctrl.Result{RequeueAfter: r.RetryTimeout}, err
} else {
r.Recorder.Event(&endpoint, "Normal", "Processing", "iptables chains inserted")
}
if err := r.addIPTablesRulespecForRemoteCluster(&endpoint); err != nil {
log.Error(err, "unable to insert ruleSpec")
r.Recorder.Event(&endpoint, "Warning", "Processing", err.Error())
return ctrl.Result{RequeueAfter: r.RetryTimeout}, err
} else {
r.Recorder.Event(&endpoint, "Normal", "Processing", "iptables rules inserted")
}
if err := r.InsertRoutesPerCluster(&endpoint); err != nil {
log.Error(err, "unable to insert routes")
r.Recorder.Event(&endpoint, "Warning", "Processing", err.Error())
return ctrl.Result{RequeueAfter: r.RetryTimeout}, err
} else {
r.Recorder.Event(&endpoint, "Normal", "Processing", "routes inserted")
}

return ctrl.Result{RequeueAfter: r.RetryTimeout}, nil
}

Expand Down Expand Up @@ -196,7 +219,7 @@ func (r *RouteController) createAndInsertIPTablesChains() error {
} else {
log.Info("created", "chain", LiqonetPreroutingChain, "in table", NatTable)
}
r.IPTablesChains[LiqonetPostroutingChain] = liqonetOperator.IPTableChain{
r.IPTablesChains[LiqonetPreroutingChain] = liqonetOperator.IPTableChain{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity: Why do we moved to PreRoutingChain?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was a little bug where we saved the LiqonetPreroutingChain as the LiqonetPostroutingChain in the IPTablesChains table. Looking at line 217 it is creating the LiqonetPreroutingChain.

Table: NatTable,
Name: LiqonetPreroutingChain,
}
Expand Down Expand Up @@ -250,7 +273,7 @@ func (r *RouteController) createAndInsertIPTablesChains() error {
if err = liqonetOperator.InsertIptablesRulespecIfNotExists(ipt, FilterTable, "INPUT", forwardToLiqonetInputSpec); err != nil {
return err
} else {
log.Info("installed", "rulespec", strings.Join(forwardToLiqonetInputSpec, " "), "belonging to chain POSTROUTING in table", FilterTable)
log.Info("installed", "rulespec", strings.Join(forwardToLiqonetInputSpec, " "), "belonging to chain INPUT in table", FilterTable)
}
r.IPTablesRuleSpecsReferencingChains[strings.Join(forwardToLiqonetInputSpec, " ")] = liqonetOperator.IPtableRule{
Table: FilterTable,
Expand Down Expand Up @@ -294,7 +317,7 @@ func (r *RouteController) addIPTablesRulespecForRemoteCluster(endpoint *netv1alp
if err = ipt.AppendUnique(NatTable, LiqonetPostroutingChain, ruleSpec...); err != nil {
return fmt.Errorf("unable to insert iptable rule \"%s\" in %s table, %s chain: %v", ruleSpec, NatTable, LiqonetPostroutingChain, err)
} else {
log.Info("installed", "rulespec", strings.Join(ruleSpec, " "), "belonging to chain", LiqonetPostroutingChain, "in table", FilterTable)
log.Info("installed", "rulespec", strings.Join(ruleSpec, " "), "belonging to chain", LiqonetPostroutingChain, "in table", NatTable)
}
ruleSpecs = append(ruleSpecs, liqonetOperator.IPtableRule{
Table: NatTable,
Expand Down
3 changes: 1 addition & 2 deletions internal/liqonet/route-operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func getRouteController() *RouteController {
Log: ctrl.Log.WithName("route-operator"),
Scheme: nil,
clientset: kubernetes.Clientset{},
RouteOperator: false,
NodeName: "test",
ClientSet: nil,
RemoteVTEPs: nil,
Expand Down Expand Up @@ -81,7 +80,7 @@ func TestCreateAndInsertIPTablesChains(t *testing.T) {
for i := 3; i >= 0; i-- {
err := r.createAndInsertIPTablesChains()
assert.Nil(t, err, "error should be nil")
assert.Equal(t, 3, len(r.IPTablesChains), "there should be three new chains")
assert.Equal(t, 4, len(r.IPTablesChains), "there should be 4 new chains")
assert.Equal(t, 4, len(r.IPTablesRuleSpecsReferencingChains), "there should be 4 new rules")
}
}
Expand Down
82 changes: 40 additions & 42 deletions internal/liqonet/tunnel-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
liqonetOperator "github.com/liqotech/liqo/pkg/liqonet"
"github.com/vishvananda/netlink"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"os"
"os/signal"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -33,6 +35,7 @@ type TunnelController struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
TunnelIFacesPerRemoteCluster map[string]int
RetryTimeout time.Duration
}
Expand All @@ -46,14 +49,14 @@ func (r *TunnelController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
var endpoint netv1alpha1.TunnelEndpoint
//name of our finalizer
tunnelEndpointFinalizer := "tunnelEndpointFinalizer.net.liqo.io"

if err := r.Get(ctx, req.NamespacedName, &endpoint); err != nil {
log.Error(err, "unable to fetch endpoint, probably it has been deleted")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
//if the endpoint CR is not processed then return
if endpoint.Status.Phase != "Processed" && endpoint.Status.Phase != "Ready" {
log.Info("tunnelEndpoint is not ready ", "name", endpoint.Name, "phase", endpoint.Status.Phase)
//we wait for the resource to be ready. The resource is created in two steps, firt the spec and metadata fields
//then the status field. so we wait for the status to be ready.
if endpoint.Status.Phase != "Ready" {
log.Info("the resource", "with name", endpoint.Name, "is not ready")
return ctrl.Result{RequeueAfter: r.RetryTimeout}, nil
}
// examine DeletionTimestamp to determine if object is under deletion
Expand All @@ -72,8 +75,11 @@ func (r *TunnelController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
//the object is being deleted
if liqonetOperator.ContainsString(endpoint.Finalizers, tunnelEndpointFinalizer) {
if err := liqonetOperator.RemoveGreTunnel(&endpoint); err != nil {
//record an event and return
r.Recorder.Event(&endpoint, "Warning", "Processing", err.Error())
return ctrl.Result{}, err
}
r.Recorder.Event(&endpoint, "Normal", "Processing", "tunnel network interface removed")
//safe to do, even if the key does not exist in the map
delete(r.TunnelIFacesPerRemoteCluster, endpoint.Spec.ClusterID)
log.Info("tunnel iface removed")
Expand All @@ -85,49 +91,41 @@ func (r *TunnelController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
}
return ctrl.Result{RequeueAfter: r.RetryTimeout}, nil
}
//try to install the GRE tunnel if it does not exist
iFaceIndex, iFaceName, err := liqonetOperator.InstallGreTunnel(&endpoint)
if err != nil {
log.Error(err, "unable to create the gre tunnel")
r.Recorder.Event(&endpoint, "Warning", "Processing", err.Error())
return ctrl.Result{RequeueAfter: r.RetryTimeout}, err
}
r.Recorder.Event(&endpoint, "Normal", "Processing", "tunnel network interface installed")
log.Info("gre tunnel installed", "index", iFaceIndex, "name", iFaceName)
//save the IFace index in the map
r.TunnelIFacesPerRemoteCluster[endpoint.Spec.ClusterID] = iFaceIndex
log.Info("installed gretunel with index: " + iFaceName)

//update the status of the endpoint custom resource
//and install the tunnel only
//check if the CR is newly created
if endpoint.Status.Phase == "Processed" {
iFaceIndex, iFaceName, err := liqonetOperator.InstallGreTunnel(&endpoint)
if err != nil {
log.Error(err, "unable to create the gre tunnel")
return ctrl.Result{RequeueAfter: r.RetryTimeout}, err
//update the status of CR if needed
//here we recover from conflicting resource versions
retryError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
toBeUpdated := false
if endpoint.Status.TunnelIFaceName != iFaceName {
endpoint.Status.TunnelIFaceName = iFaceName
toBeUpdated = true
}
log.Info("gre tunnel installed", "index", iFaceIndex, "name", iFaceName)
//save the IFace index in the map
r.TunnelIFacesPerRemoteCluster[endpoint.Spec.ClusterID] = iFaceIndex
log.Info("installed gretunel with index: " + iFaceName)
//update the status of CR
localTunnelPublicIP, err := liqonetOperator.GetLocalTunnelPublicIPToString()
if err != nil {
log.Error(err, "unable to get localTunnelPublicIP")
if endpoint.Status.TunnelIFaceIndex != iFaceIndex {
endpoint.Status.TunnelIFaceIndex = iFaceIndex
toBeUpdated = true
}
endpoint.Status.TunnelIFaceName = iFaceName
endpoint.Status.TunnelIFaceIndex = iFaceIndex
endpoint.Status.LocalTunnelPublicIP = localTunnelPublicIP
endpoint.Status.RemoteTunnelPublicIP = endpoint.Spec.TunnelPublicIP
endpoint.Status.Phase = "Ready"
err = r.Client.Status().Update(ctx, &endpoint)
if err != nil {
return ctrl.Result{RequeueAfter: r.RetryTimeout}, err
if toBeUpdated {
err = r.Status().Update(context.Background(), &endpoint)
return err
}
} else if endpoint.Status.Phase == "Ready" {
//set the label that the resource have been processed by tunnel-operator
endpoint.ObjectMeta.SetLabels(liqonetOperator.SetLabelHandler(liqonetOperator.TunOpLabelKey, "ready", endpoint.ObjectMeta.GetLabels()))
err := r.Client.Update(ctx, &endpoint)
if err != nil {
return ctrl.Result{RequeueAfter: r.RetryTimeout}, err
}

} else {
return ctrl.Result{RequeueAfter: r.RetryTimeout}, nil
return nil
})
if retryError != nil {
log.Error(retryError, "unable to create the gre tunnel")
return ctrl.Result{RequeueAfter: r.RetryTimeout}, retryError
}
//save the IFace index in the map
//we come here only if the tunnel is installed and the CR status has been updated
r.TunnelIFacesPerRemoteCluster[endpoint.Spec.ClusterID] = endpoint.Status.TunnelIFaceIndex

return ctrl.Result{RequeueAfter: r.RetryTimeout}, nil
}

Expand Down
Loading