From bdc73956702f875b057d3aac13e5c4b9090ac0c5 Mon Sep 17 00:00:00 2001 From: alacuku Date: Thu, 17 Dec 2020 14:09:44 +0100 Subject: [PATCH] race condition fix --- .../tunnelEndpointCreator-operator.go | 60 ++++++++++--------- pkg/liqonet/overlay/overlay.go | 54 +---------------- pkg/liqonet/wireguard/utils.go | 58 ++++++++++++++++++ 3 files changed, 91 insertions(+), 81 deletions(-) create mode 100644 pkg/liqonet/wireguard/utils.go diff --git a/internal/liqonet/tunnelEndpointCreator/tunnelEndpointCreator-operator.go b/internal/liqonet/tunnelEndpointCreator/tunnelEndpointCreator-operator.go index b0dc1a0742..79115519f9 100644 --- a/internal/liqonet/tunnelEndpointCreator/tunnelEndpointCreator-operator.go +++ b/internal/liqonet/tunnelEndpointCreator/tunnelEndpointCreator-operator.go @@ -311,22 +311,10 @@ func (tec *TunnelEndpointCreator) GetNetworkConfig(destinationClusterID string) if len(networkConfigList.Items) != 1 { if len(networkConfigList.Items) == 0 { return nil, false, nil + } else { + klog.Errorf("more than one instances of type %s exists for remote cluster %s", netv1alpha1.GroupVersion.String(), clusterID) + return nil, false, fmt.Errorf("multiple instances of %s for remote cluster %s", netv1alpha1.GroupVersion.String(), clusterID) } - //if we find a duplicated resource for a remote cluster than we delete them, and recreate it. - //sometimes a race condition could happen when k8s api-server is under heavy load - //the resource has already ben submitted for creation to the api-server but when we check it has not been yet - //created and so we recreate it having so the same resource duplicated but with different name because we use labels - //to track a resource based on the cluster destination. - klog.Warningf("more than one instances of type %s exists for remote cluster %s, deleting them", netv1alpha1.GroupVersion.String(), clusterID) - for _, netConfig := range networkConfigList.Items { - nc := netConfig - klog.Infof("deleting resource %s with GVR %s, because it is duplicated", netConfig.Name, netConfig.GroupVersionKind().String()) - if err := tec.Delete(context.Background(), &nc); err != nil { - klog.Errorf("an error occurred while deleting resource %s: %v", netConfig.Name, err) - return nil, false, err - } - } - return nil, false, nil } return &networkConfigList.Items[0], true, nil } @@ -393,14 +381,37 @@ func (tec *TunnelEndpointCreator) processRemoteNetConfig(netConfig *netv1alpha1. } func (tec *TunnelEndpointCreator) processLocalNetConfig(netConfig *netv1alpha1.NetworkConfig) error { + //first check that this is the only resource for the remote cluster + netConfigList := &netv1alpha1.NetworkConfigList{} + labels := client.MatchingLabels{crdReplicator.DestinationLabel: netConfig.Labels[crdReplicator.DestinationLabel]} + err := tec.List(context.Background(), netConfigList, labels) + if err != nil { + klog.Errorf("an error occurred while listing resources: %s", err) + return err + } + if len(netConfigList.Items) != 1 { + if len(netConfigList.Items) == 0 { + return nil + } + klog.Warningf("more than one instances of type %s exists for remote cluster %s, deleting them", netv1alpha1.GroupVersion.String(), netConfig.Spec.ClusterID) + for _, netConfig := range netConfigList.Items { + nc := netConfig + klog.Infof("deleting resource %s with GVR %s, because it is duplicated", netConfig.Name, netConfig.GroupVersionKind().String()) + if err := tec.Delete(context.Background(), &nc); err != nil { + klog.Errorf("an error occurred while deleting resource %s: %v", netConfig.Name, err) + return err + } + } + return nil + } //check if the resource has been processed by the remote cluster if netConfig.Status.PodCIDRNAT == "" { return nil } //we get the remote netconfig related to this one - netConfigList := &netv1alpha1.NetworkConfigList{} - labels := client.MatchingLabels{crdReplicator.RemoteLabelSelector: netConfig.Spec.ClusterID} - err := tec.List(context.Background(), netConfigList, labels) + netConfigList = &netv1alpha1.NetworkConfigList{} + labels = client.MatchingLabels{crdReplicator.RemoteLabelSelector: netConfig.Spec.ClusterID} + err = tec.List(context.Background(), netConfigList, labels) if err != nil { klog.Errorf("an error occurred while listing resources: %s", err) return err @@ -602,17 +613,10 @@ func (tec *TunnelEndpointCreator) GetTunnelEndpoint(destinationClusterID string) if len(tunEndpointList.Items) != 1 { if len(tunEndpointList.Items) == 0 { return nil, false, nil + } else { + klog.Errorf("more than one instances of type %s exists for remote cluster %s", netv1alpha1.GroupVersion.String(), clusterID) + return nil, false, fmt.Errorf("multiple instances of %s for remote cluster %s", netv1alpha1.GroupVersion.String(), clusterID) } - klog.Warningf("more than one instances of type %s exists for remote cluster %s, deleting them", netv1alpha1.GroupVersion.String(), clusterID) - for _, tep := range tunEndpointList.Items { - tepTemp := tep - klog.Infof("deleting resource %s with GVR %s, because it is duplicated", tepTemp.Name, tepTemp.GroupVersionKind().String()) - if err := tec.Delete(context.Background(), &tepTemp); err != nil { - klog.Errorf("an error occurred while deleting resource %s: %v", tepTemp.Name, err) - return nil, false, err - } - } - return nil, false, nil } return &tunEndpointList.Items[0], true, nil } diff --git a/pkg/liqonet/overlay/overlay.go b/pkg/liqonet/overlay/overlay.go index 619b63652a..797459f4ff 100644 --- a/pkg/liqonet/overlay/overlay.go +++ b/pkg/liqonet/overlay/overlay.go @@ -1,13 +1,7 @@ package overlay import ( - "context" - "fmt" "github.com/liqotech/liqo/pkg/liqonet/wireguard" - "golang.zx2c4.com/wireguard/wgctrl/wgtypes" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8s "k8s.io/client-go/kubernetes" "strings" ) @@ -27,7 +21,7 @@ var ( func CreateInterface(nodeName, namespace, ipAddr string, c *k8s.Clientset) (*wireguard.Wireguard, error) { secretName := strings.Join([]string{secretPrefix, nodeName}, "") - priv, pub, err := getKeys(secretName, namespace, c) + priv, pub, err := wireguard.GetKeys(secretName, namespace, c) if err != nil { return nil, err } @@ -46,52 +40,6 @@ func CreateInterface(nodeName, namespace, ipAddr string, c *k8s.Clientset) (*wir return wg, nil } -func getKeys(secretName, namespace string, c *k8s.Clientset) (priv, pub wgtypes.Key, err error) { - //first we check if a secret containing valid keys already exists - s, err := c.CoreV1().Secrets(namespace).Get(context.Background(), secretName, metav1.GetOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - return priv, pub, err - } - //if the secret does not exist then keys are generated and saved into a secret - if apierrors.IsNotFound(err) { - // generate private and public keys - if priv, err = wgtypes.GeneratePrivateKey(); err != nil { - return priv, pub, fmt.Errorf("error generating private key for wireguard backend: %v", err) - } - pub = priv.PublicKey() - pKey := corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: secretName, - Namespace: namespace, - }, - StringData: map[string]string{wireguard.PublicKey: pub.String(), wireguard.PrivateKey: priv.String()}, - } - _, err = c.CoreV1().Secrets(namespace).Create(context.Background(), &pKey, metav1.CreateOptions{}) - if err != nil { - return priv, pub, fmt.Errorf("failed to create the secret with name %s: %v", secretName, err) - } - return priv, pub, nil - } - //get the keys from the existing secret and set them - privKey, found := s.Data[wireguard.PrivateKey] - if !found { - return priv, pub, fmt.Errorf("no data with key '%s' found in secret %s", wireguard.PrivateKey, secretName) - } - priv, err = wgtypes.ParseKey(string(privKey)) - if err != nil { - return priv, pub, fmt.Errorf("an error occurred while parsing the private key for the wireguard driver :%v", err) - } - pubKey, found := s.Data[wireguard.PublicKey] - if !found { - return priv, pub, fmt.Errorf("no data with key '%s' found in secret %s", wireguard.PublicKey, secretName) - } - pub, err = wgtypes.ParseKey(string(pubKey)) - if err != nil { - return priv, pub, fmt.Errorf("an error occurred while parsing the public key for the wireguard driver :%v", err) - } - return priv, pub, nil -} - func GetOverlayIP(ip string) string { tokens := strings.Split(ip, ".") return strings.Join([]string{NetworkPrefix, tokens[1], tokens[2], tokens[3]}, ".") diff --git a/pkg/liqonet/wireguard/utils.go b/pkg/liqonet/wireguard/utils.go new file mode 100644 index 0000000000..ac244cb0d3 --- /dev/null +++ b/pkg/liqonet/wireguard/utils.go @@ -0,0 +1,58 @@ +package wireguard + +import ( + "context" + "fmt" + "github.com/liqotech/liqo/pkg/liqonet/tunnel/wireguard" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8s "k8s.io/client-go/kubernetes" +) + +func GetKeys(secretName, namespace string, c *k8s.Clientset) (priv, pub wgtypes.Key, err error) { + //first we check if a secret containing valid keys already exists + s, err := c.CoreV1().Secrets(namespace).Get(context.Background(), secretName, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return priv, pub, err + } + //if the secret does not exist then keys are generated and saved into a secret + if apierrors.IsNotFound(err) { + // generate private and public keys + if priv, err = wgtypes.GeneratePrivateKey(); err != nil { + return priv, pub, fmt.Errorf("error generating private key for wireguard backend: %v", err) + } + pub = priv.PublicKey() + pKey := corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: secretName, + Namespace: namespace, + }, + StringData: map[string]string{wireguard.PublicKey: pub.String(), wireguard.PrivateKey: priv.String()}, + } + _, err = c.CoreV1().Secrets(namespace).Create(context.Background(), &pKey, metav1.CreateOptions{}) + if err != nil { + return priv, pub, fmt.Errorf("failed to create the secret with name %s: %v", secretName, err) + } + return priv, pub, nil + } + //get the keys from the existing secret and set them + privKey, found := s.Data[wireguard.PrivateKey] + if !found { + return priv, pub, fmt.Errorf("no data with key '%s' found in secret %s", wireguard.PrivateKey, secretName) + } + priv, err = wgtypes.ParseKey(string(privKey)) + if err != nil { + return priv, pub, fmt.Errorf("an error occurred while parsing the private key for the wireguard driver :%v", err) + } + pubKey, found := s.Data[wireguard.PublicKey] + if !found { + return priv, pub, fmt.Errorf("no data with key '%s' found in secret %s", wireguard.PublicKey, secretName) + } + pub, err = wgtypes.ParseKey(string(pubKey)) + if err != nil { + return priv, pub, fmt.Errorf("an error occurred while parsing the public key for the wireguard driver :%v", err) + } + return priv, pub, nil +}