Skip to content

Commit

Permalink
race condition fix
Browse files Browse the repository at this point in the history
  • Loading branch information
alacuku committed Dec 17, 2020
1 parent 436bec7 commit bdc7395
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,22 +311,10 @@ func (tec *TunnelEndpointCreator) GetNetworkConfig(destinationClusterID string)
if len(networkConfigList.Items) != 1 {
if len(networkConfigList.Items) == 0 {
return nil, false, nil
} else {
klog.Errorf("more than one instances of type %s exists for remote cluster %s", netv1alpha1.GroupVersion.String(), clusterID)
return nil, false, fmt.Errorf("multiple instances of %s for remote cluster %s", netv1alpha1.GroupVersion.String(), clusterID)
}
//if we find a duplicated resource for a remote cluster than we delete them, and recreate it.
//sometimes a race condition could happen when k8s api-server is under heavy load
//the resource has already ben submitted for creation to the api-server but when we check it has not been yet
//created and so we recreate it having so the same resource duplicated but with different name because we use labels
//to track a resource based on the cluster destination.
klog.Warningf("more than one instances of type %s exists for remote cluster %s, deleting them", netv1alpha1.GroupVersion.String(), clusterID)
for _, netConfig := range networkConfigList.Items {
nc := netConfig
klog.Infof("deleting resource %s with GVR %s, because it is duplicated", netConfig.Name, netConfig.GroupVersionKind().String())
if err := tec.Delete(context.Background(), &nc); err != nil {
klog.Errorf("an error occurred while deleting resource %s: %v", netConfig.Name, err)
return nil, false, err
}
}
return nil, false, nil
}
return &networkConfigList.Items[0], true, nil
}
Expand Down Expand Up @@ -393,14 +381,37 @@ func (tec *TunnelEndpointCreator) processRemoteNetConfig(netConfig *netv1alpha1.
}

func (tec *TunnelEndpointCreator) processLocalNetConfig(netConfig *netv1alpha1.NetworkConfig) error {
//first check that this is the only resource for the remote cluster
netConfigList := &netv1alpha1.NetworkConfigList{}
labels := client.MatchingLabels{crdReplicator.DestinationLabel: netConfig.Labels[crdReplicator.DestinationLabel]}
err := tec.List(context.Background(), netConfigList, labels)
if err != nil {
klog.Errorf("an error occurred while listing resources: %s", err)
return err
}
if len(netConfigList.Items) != 1 {
if len(netConfigList.Items) == 0 {
return nil
}
klog.Warningf("more than one instances of type %s exists for remote cluster %s, deleting them", netv1alpha1.GroupVersion.String(), netConfig.Spec.ClusterID)
for _, netConfig := range netConfigList.Items {
nc := netConfig
klog.Infof("deleting resource %s with GVR %s, because it is duplicated", netConfig.Name, netConfig.GroupVersionKind().String())
if err := tec.Delete(context.Background(), &nc); err != nil {
klog.Errorf("an error occurred while deleting resource %s: %v", netConfig.Name, err)
return err
}
}
return nil
}
//check if the resource has been processed by the remote cluster
if netConfig.Status.PodCIDRNAT == "" {
return nil
}
//we get the remote netconfig related to this one
netConfigList := &netv1alpha1.NetworkConfigList{}
labels := client.MatchingLabels{crdReplicator.RemoteLabelSelector: netConfig.Spec.ClusterID}
err := tec.List(context.Background(), netConfigList, labels)
netConfigList = &netv1alpha1.NetworkConfigList{}
labels = client.MatchingLabels{crdReplicator.RemoteLabelSelector: netConfig.Spec.ClusterID}
err = tec.List(context.Background(), netConfigList, labels)
if err != nil {
klog.Errorf("an error occurred while listing resources: %s", err)
return err
Expand Down Expand Up @@ -602,17 +613,10 @@ func (tec *TunnelEndpointCreator) GetTunnelEndpoint(destinationClusterID string)
if len(tunEndpointList.Items) != 1 {
if len(tunEndpointList.Items) == 0 {
return nil, false, nil
} else {
klog.Errorf("more than one instances of type %s exists for remote cluster %s", netv1alpha1.GroupVersion.String(), clusterID)
return nil, false, fmt.Errorf("multiple instances of %s for remote cluster %s", netv1alpha1.GroupVersion.String(), clusterID)
}
klog.Warningf("more than one instances of type %s exists for remote cluster %s, deleting them", netv1alpha1.GroupVersion.String(), clusterID)
for _, tep := range tunEndpointList.Items {
tepTemp := tep
klog.Infof("deleting resource %s with GVR %s, because it is duplicated", tepTemp.Name, tepTemp.GroupVersionKind().String())
if err := tec.Delete(context.Background(), &tepTemp); err != nil {
klog.Errorf("an error occurred while deleting resource %s: %v", tepTemp.Name, err)
return nil, false, err
}
}
return nil, false, nil
}
return &tunEndpointList.Items[0], true, nil
}
Expand Down
54 changes: 1 addition & 53 deletions pkg/liqonet/overlay/overlay.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
package overlay

import (
"context"
"fmt"
"github.com/liqotech/liqo/pkg/liqonet/wireguard"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8s "k8s.io/client-go/kubernetes"
"strings"
)
Expand All @@ -27,7 +21,7 @@ var (

func CreateInterface(nodeName, namespace, ipAddr string, c *k8s.Clientset) (*wireguard.Wireguard, error) {
secretName := strings.Join([]string{secretPrefix, nodeName}, "")
priv, pub, err := getKeys(secretName, namespace, c)
priv, pub, err := wireguard.GetKeys(secretName, namespace, c)
if err != nil {
return nil, err
}
Expand All @@ -46,52 +40,6 @@ func CreateInterface(nodeName, namespace, ipAddr string, c *k8s.Clientset) (*wir
return wg, nil
}

func getKeys(secretName, namespace string, c *k8s.Clientset) (priv, pub wgtypes.Key, err error) {
//first we check if a secret containing valid keys already exists
s, err := c.CoreV1().Secrets(namespace).Get(context.Background(), secretName, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return priv, pub, err
}
//if the secret does not exist then keys are generated and saved into a secret
if apierrors.IsNotFound(err) {
// generate private and public keys
if priv, err = wgtypes.GeneratePrivateKey(); err != nil {
return priv, pub, fmt.Errorf("error generating private key for wireguard backend: %v", err)
}
pub = priv.PublicKey()
pKey := corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretName,
Namespace: namespace,
},
StringData: map[string]string{wireguard.PublicKey: pub.String(), wireguard.PrivateKey: priv.String()},
}
_, err = c.CoreV1().Secrets(namespace).Create(context.Background(), &pKey, metav1.CreateOptions{})
if err != nil {
return priv, pub, fmt.Errorf("failed to create the secret with name %s: %v", secretName, err)
}
return priv, pub, nil
}
//get the keys from the existing secret and set them
privKey, found := s.Data[wireguard.PrivateKey]
if !found {
return priv, pub, fmt.Errorf("no data with key '%s' found in secret %s", wireguard.PrivateKey, secretName)
}
priv, err = wgtypes.ParseKey(string(privKey))
if err != nil {
return priv, pub, fmt.Errorf("an error occurred while parsing the private key for the wireguard driver :%v", err)
}
pubKey, found := s.Data[wireguard.PublicKey]
if !found {
return priv, pub, fmt.Errorf("no data with key '%s' found in secret %s", wireguard.PublicKey, secretName)
}
pub, err = wgtypes.ParseKey(string(pubKey))
if err != nil {
return priv, pub, fmt.Errorf("an error occurred while parsing the public key for the wireguard driver :%v", err)
}
return priv, pub, nil
}

func GetOverlayIP(ip string) string {
tokens := strings.Split(ip, ".")
return strings.Join([]string{NetworkPrefix, tokens[1], tokens[2], tokens[3]}, ".")
Expand Down
58 changes: 58 additions & 0 deletions pkg/liqonet/wireguard/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package wireguard

import (
"context"
"fmt"
"github.com/liqotech/liqo/pkg/liqonet/tunnel/wireguard"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8s "k8s.io/client-go/kubernetes"
)

func GetKeys(secretName, namespace string, c *k8s.Clientset) (priv, pub wgtypes.Key, err error) {
//first we check if a secret containing valid keys already exists
s, err := c.CoreV1().Secrets(namespace).Get(context.Background(), secretName, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return priv, pub, err
}
//if the secret does not exist then keys are generated and saved into a secret
if apierrors.IsNotFound(err) {
// generate private and public keys
if priv, err = wgtypes.GeneratePrivateKey(); err != nil {
return priv, pub, fmt.Errorf("error generating private key for wireguard backend: %v", err)
}
pub = priv.PublicKey()
pKey := corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretName,
Namespace: namespace,
},
StringData: map[string]string{wireguard.PublicKey: pub.String(), wireguard.PrivateKey: priv.String()},
}
_, err = c.CoreV1().Secrets(namespace).Create(context.Background(), &pKey, metav1.CreateOptions{})
if err != nil {
return priv, pub, fmt.Errorf("failed to create the secret with name %s: %v", secretName, err)
}
return priv, pub, nil
}
//get the keys from the existing secret and set them
privKey, found := s.Data[wireguard.PrivateKey]
if !found {
return priv, pub, fmt.Errorf("no data with key '%s' found in secret %s", wireguard.PrivateKey, secretName)
}
priv, err = wgtypes.ParseKey(string(privKey))
if err != nil {
return priv, pub, fmt.Errorf("an error occurred while parsing the private key for the wireguard driver :%v", err)
}
pubKey, found := s.Data[wireguard.PublicKey]
if !found {
return priv, pub, fmt.Errorf("no data with key '%s' found in secret %s", wireguard.PublicKey, secretName)
}
pub, err = wgtypes.ParseKey(string(pubKey))
if err != nil {
return priv, pub, fmt.Errorf("an error occurred while parsing the public key for the wireguard driver :%v", err)
}
return priv, pub, nil
}

0 comments on commit bdc7395

Please sign in to comment.