From 80694cf1e1dd9c267185543f4646f8319299ff10 Mon Sep 17 00:00:00 2001 From: davidefalcone1 Date: Tue, 20 Apr 2021 16:38:09 +0000 Subject: [PATCH] Endpoint reflection with ExternalCIDR --- Makefile | 22 + apis/net/v1alpha1/ipamstorage_types.go | 29 +- apis/net/v1alpha1/zz_generated.deepcopy.go | 44 ++ cmd/liqonet/main.go | 5 +- .../liqo/crds/net.liqo.io_ipamstorages.yaml | 57 +- .../liqo-network-manager-deployment.yaml | 3 + .../liqo-network-manager-service.yaml | 18 + go.mod | 3 + .../tunnelEndpointCreator-config.go | 11 +- .../tunnelEndpointCreator-operator.go | 22 +- pkg/consts/liqonet.go | 12 + pkg/consts/networkManager.go | 6 - pkg/liqonet/ipam.go | 442 +++++++++-- pkg/liqonet/ipam.pb.go | 348 +++++++++ pkg/liqonet/ipam.proto | 24 + pkg/liqonet/ipamStorage.go | 61 +- pkg/liqonet/ipam_grpc.pb.go | 138 ++++ pkg/liqonet/ipam_test.go | 710 ++++++++++++++---- pkg/liqonet/test/doc.go | 2 + pkg/liqonet/test/ipam_mock.go | 35 + pkg/liqonet/utils.go | 40 +- pkg/liqonet/utils_test.go | 21 + .../reflectors/outgoing/apiTypes.go | 16 + .../reflectors/outgoing/endpointSlices.go | 23 +- pkg/virtualKubelet/forge/forge_suite_test.go | 13 - pkg/virtualKubelet/forge/pods.go | 39 +- pkg/virtualKubelet/forge/pods_test.go | 31 - .../reflection/endpointslices_test.go | 4 + 28 files changed, 1851 insertions(+), 328 deletions(-) create mode 100644 deployments/liqo/templates/liqo-network-manager-service.yaml create mode 100644 pkg/consts/liqonet.go delete mode 100644 pkg/consts/networkManager.go create mode 100644 pkg/liqonet/ipam.pb.go create mode 100644 pkg/liqonet/ipam.proto create mode 100644 pkg/liqonet/ipam_grpc.pb.go create mode 100644 pkg/liqonet/test/doc.go create mode 100644 pkg/liqonet/test/ipam_mock.go delete mode 100644 pkg/virtualKubelet/forge/forge_suite_test.go delete mode 100644 pkg/virtualKubelet/forge/pods_test.go diff --git a/Makefile b/Makefile index 66e12bc97f..c7af12973a 100644 --- a/Makefile +++ b/Makefile @@ -79,6 +79,28 @@ vet: generate: controller-gen $(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./..." +# Generate gRPC files +grpc: protoc + $(PROTOC) --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative pkg/liqonet/ipam.proto + +protoc: +ifeq (, $(shell which protoc)) + @{ \ + PB_REL="https://github.com/protocolbuffers/protobuf/releases" ;\ + version=3.15.5 ;\ + arch=x86_64 ;\ + curl -LO $${PB_REL}/download/v$${version}/protoc-$${version}-linux-$${arch}.zip ;\ + unzip protoc-$${version}-linux-$${arch}.zip -d $${HOME}/.local ;\ + rm protoc-$${version}-linux-$${arch}.zip ;\ + PROTOC_TMP_DIR=$$(mktemp -d) ;\ + cd $$PROTOC_TMP_DIR ;\ + go mod init tmp ;\ + go get google.golang.org/protobuf/cmd/protoc-gen-go ;\ + go get google.golang.org/grpc/cmd/protoc-gen-go-grpc ;\ + rm -rf $$PROTOC_TMP_DIR ;\ + } +endif +PROTOC=$(shell which protoc) # find or download controller-gen # download controller-gen if necessary diff --git a/apis/net/v1alpha1/ipamstorage_types.go b/apis/net/v1alpha1/ipamstorage_types.go index a430f0ba80..57fddc0e41 100644 --- a/apis/net/v1alpha1/ipamstorage_types.go +++ b/apis/net/v1alpha1/ipamstorage_types.go @@ -23,12 +23,27 @@ import ( // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. +// Subnets type contains relevant networks related to a remote cluster. type Subnets struct { - PodCIDR string `json:"podCIDR"` - // Network used in remote cluster for local service endpoints. - RemoteExternalCIDR string `json:"remoteExternalCIDR"` + // Network used in the remote cluster for local Pods. Default is "None": this means remote cluster uses local cluster PodCIDR. + LocalNATPodCIDR string `json:"localNATPodCIDR"` + // Network used for Pods in the remote cluster. + RemotePodCIDR string `json:"remotePodCIDR"` + // Network used in remote cluster for local service endpoints. Default is "None": this means remote cluster uses local cluster ExternalCIDR. + LocalNATExternalCIDR string `json:"localNATExternalCIDR"` // Network used in local cluster for remote service endpoints. - LocalExternalCIDR string `json:"localExternalCIDR"` + RemoteExternalCIDR string `json:"remoteExternalCIDR"` +} + +// ClusterMapping is an empty struct. +type ClusterMapping struct{} + +// EndpointMapping describes a relation between an enpoint IP and an IP belonging to ExternalCIDR. +type EndpointMapping struct { + // IP belonging to cluster ExtenalCIDR assigned to this endpoint. + IP string `json:"ip"` + // Set of clusters to which this endpoint has been reflected. Only the key, which is the ClusterID, is useful. + ClusterMappings map[string]ClusterMapping `json:"clusterMappings"` } // IpamSpec defines the desired state of Ipam. @@ -44,6 +59,12 @@ type IpamSpec struct { ClusterSubnets map[string]Subnets `json:"clusterSubnets"` // Cluster ExternalCIDR ExternalCIDR string `json:"externalCIDR"` + // Endpoint IP mappings. Key is the IP address of the local endpoint, value is the IP of the remote endpoint, so it belongs to an ExternalCIDR + EndpointMappings map[string]EndpointMapping `json:"endpointMappings"` + // Cluster PodCIDR + PodCIDR string `json:"podCIDR"` + // ServiceCIDR + ServiceCIDR string `json:"serviceCIDR"` } // +kubebuilder:object:root=true diff --git a/apis/net/v1alpha1/zz_generated.deepcopy.go b/apis/net/v1alpha1/zz_generated.deepcopy.go index c7f7cfbc6a..c93de61380 100644 --- a/apis/net/v1alpha1/zz_generated.deepcopy.go +++ b/apis/net/v1alpha1/zz_generated.deepcopy.go @@ -23,6 +23,21 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterMapping) DeepCopyInto(out *ClusterMapping) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterMapping. +func (in *ClusterMapping) DeepCopy() *ClusterMapping { + if in == nil { + return nil + } + out := new(ClusterMapping) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Connection) DeepCopyInto(out *Connection) { *out = *in @@ -45,6 +60,28 @@ func (in *Connection) DeepCopy() *Connection { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EndpointMapping) DeepCopyInto(out *EndpointMapping) { + *out = *in + if in.ClusterMappings != nil { + in, out := &in.ClusterMappings, &out.ClusterMappings + *out = make(map[string]ClusterMapping, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EndpointMapping. +func (in *EndpointMapping) DeepCopy() *EndpointMapping { + if in == nil { + return nil + } + out := new(EndpointMapping) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *IpamList) DeepCopyInto(out *IpamList) { *out = *in @@ -107,6 +144,13 @@ func (in *IpamSpec) DeepCopyInto(out *IpamSpec) { (*out)[key] = val } } + if in.EndpointMappings != nil { + in, out := &in.EndpointMappings, &out.EndpointMappings + *out = make(map[string]EndpointMapping, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IpamSpec. diff --git a/cmd/liqonet/main.go b/cmd/liqonet/main.go index acc461e8b0..c8044358be 100644 --- a/cmd/liqonet/main.go +++ b/cmd/liqonet/main.go @@ -34,6 +34,7 @@ import ( route_operator "github.com/liqotech/liqo/internal/liqonet/route-operator" tunnel_operator "github.com/liqotech/liqo/internal/liqonet/tunnel-operator" "github.com/liqotech/liqo/internal/liqonet/tunnelEndpointCreator" + liqoconst "github.com/liqotech/liqo/pkg/consts" liqonetOperator "github.com/liqotech/liqo/pkg/liqonet" "github.com/liqotech/liqo/pkg/liqonet/wireguard" "github.com/liqotech/liqo/pkg/mapperUtils" @@ -132,9 +133,9 @@ func main() { case "tunnelEndpointCreator-operator": dynClient := dynamic.NewForConfigOrDie(mgr.GetConfig()) ipam := liqonetOperator.NewIPAM() - err = ipam.Init(liqonetOperator.Pools, dynClient) + err = ipam.Init(liqonetOperator.Pools, dynClient, liqoconst.NetworkManagerIpamPort) if err != nil { - klog.Errorf("cannot init IPAM:%s", err.Error()) + klog.Errorf("cannot init IPAM:%w", err) } r := &tunnelEndpointCreator.TunnelEndpointCreator{ Client: mgr.GetClient(), diff --git a/deployments/liqo/crds/net.liqo.io_ipamstorages.yaml b/deployments/liqo/crds/net.liqo.io_ipamstorages.yaml index 7219d3c595..afb4245b61 100644 --- a/deployments/liqo/crds/net.liqo.io_ipamstorages.yaml +++ b/deployments/liqo/crds/net.liqo.io_ipamstorages.yaml @@ -38,29 +38,66 @@ spec: properties: clusterSubnets: additionalProperties: + description: Subnets type contains relevant networks related to + a remote cluster. properties: - localExternalCIDR: - description: Network used in local cluster for remote service - endpoints. + localNATExternalCIDR: + description: 'Network used in remote cluster for local service + endpoints. Default is "None": this means remote cluster uses + local cluster ExternalCIDR.' type: string - podCIDR: + localNATPodCIDR: + description: 'Network used in the remote cluster for local Pods. + Default is "None": this means remote cluster uses local cluster + PodCIDR.' type: string remoteExternalCIDR: - description: Network used in remote cluster for local service + description: Network used in local cluster for remote service endpoints. type: string + remotePodCIDR: + description: Network used for Pods in the remote cluster. + type: string required: - - localExternalCIDR - - podCIDR + - localNATExternalCIDR + - localNATPodCIDR - remoteExternalCIDR + - remotePodCIDR type: object description: Map used to keep track of networks assigned to clusters. Key is the remote cluster ID, value is a the set of networks used by the remote cluster. type: object + endpointMappings: + additionalProperties: + description: EndpointMapping describes a relation between an enpoint + IP and an IP belonging to ExternalCIDR. + properties: + clusterMappings: + additionalProperties: + description: ClusterMapping is an empty struct. + type: object + description: Set of clusters to which this endpoint has been + reflected. Only the key, which is the ClusterID, is useful. + type: object + ip: + description: IP belonging to cluster ExtenalCIDR assigned to + this endpoint. + type: string + required: + - clusterMappings + - ip + type: object + description: Endpoint IP mappings. Key is the IP address of the local + endpoint, value is the IP of the remote endpoint, so it belongs + to an ExternalCIDR + type: object externalCIDR: description: Cluster ExternalCIDR type: string + podCIDR: + description: Cluster PodCIDR + type: string pools: description: Network pools. items: @@ -74,11 +111,17 @@ spec: Important: Run "make" to regenerate code after modifying this file Map consumed by go-ipam module. Key is prefic cidr, value is a Prefix.' type: object + serviceCIDR: + description: ServiceCIDR + type: string required: - clusterSubnets + - endpointMappings - externalCIDR + - podCIDR - pools - prefixes + - serviceCIDR type: object type: object served: true diff --git a/deployments/liqo/templates/liqo-network-manager-deployment.yaml b/deployments/liqo/templates/liqo-network-manager-deployment.yaml index d2abc7dc53..c9590e3a63 100644 --- a/deployments/liqo/templates/liqo-network-manager-deployment.yaml +++ b/deployments/liqo/templates/liqo-network-manager-deployment.yaml @@ -29,6 +29,9 @@ spec: imagePullPolicy: {{ .Values.pullPolicy }} name: {{ $netManagerConfig.name }} command: ["/usr/bin/liqonet"] + ports: + - name: ipam-api + containerPort: 6000 args: - "-run-as=tunnelEndpointCreator-operator" resources: diff --git a/deployments/liqo/templates/liqo-network-manager-service.yaml b/deployments/liqo/templates/liqo-network-manager-service.yaml new file mode 100644 index 0000000000..cdb8f9dd57 --- /dev/null +++ b/deployments/liqo/templates/liqo-network-manager-service.yaml @@ -0,0 +1,18 @@ +--- +{{- $netManagerConfig := (merge (dict "name" "network-manager" "module" "networking") .) -}} + +apiVersion: v1 +kind: Service +metadata: + name: {{ include "liqo.prefixedName" $netManagerConfig }} + labels: + {{- include "liqo.labels" $netManagerConfig | nindent 4 }} +spec: + # This service is made to be consumed within the cluster, in particular by the virtual kubelet + type: ClusterIP + ports: + - name: ipam-api + port: 6000 + protocol: TCP + selector: + {{- include "liqo.selectorLabels" $netManagerConfig | nindent 4 }} \ No newline at end of file diff --git a/go.mod b/go.mod index 67e23e9625..fe3db92ebb 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,9 @@ require ( golang.org/x/sys v0.0.0-20210317225723-c4fcb01b228e golang.org/x/tools v0.1.0 golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200609130330-bd2cb7843e1b + google.golang.org/grpc v1.33.2 + google.golang.org/protobuf v1.25.0 + gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect gotest.tools v2.2.0+incompatible inet.af/netaddr v0.0.0-20210313195008-843b4240e319 k8s.io/api v0.21.0 diff --git a/internal/liqonet/tunnelEndpointCreator/tunnelEndpointCreator-config.go b/internal/liqonet/tunnelEndpointCreator/tunnelEndpointCreator-config.go index 3b0ccfa842..482f9d098b 100644 --- a/internal/liqonet/tunnelEndpointCreator/tunnelEndpointCreator-config.go +++ b/internal/liqonet/tunnelEndpointCreator/tunnelEndpointCreator-config.go @@ -19,15 +19,21 @@ import ( func (tec *TunnelEndpointCreator) setNetParameters(config *configv1alpha1.ClusterConfig) { podCIDR := config.Spec.LiqonetConfig.PodCIDR serviceCIDR := config.Spec.LiqonetConfig.ServiceCIDR - externalCIDR, err := tec.IPManager.GetClusterExternalCIDR(liqonet.GetMask(podCIDR)) + externalCIDR, err := tec.IPManager.GetExternalCIDR(liqonet.GetMask(podCIDR)) if err != nil { klog.Error(err) } if tec.PodCIDR != podCIDR { + if err := tec.IPManager.SetPodCIDR(podCIDR); err != nil { + klog.Error(err) + } klog.Infof("PodCIDR set to %s", podCIDR) tec.PodCIDR = podCIDR } if tec.ServiceCIDR != serviceCIDR { + if err := tec.IPManager.SetServiceCIDR(serviceCIDR); err != nil { + klog.Error(err) + } klog.Infof("ServiceCIDR set to %s", serviceCIDR) tec.ServiceCIDR = serviceCIDR } @@ -156,7 +162,6 @@ func (tec *TunnelEndpointCreator) updatePools(additionalPools []string) error { func (tec *TunnelEndpointCreator) getReservedSubnets(config *configv1alpha1.ClusterConfig) []string { reservedSubnets := make([]string, 0) liqonetConfig := config.Spec.LiqonetConfig - reservedSubnets = append(reservedSubnets, liqonetConfig.PodCIDR, liqonetConfig.ServiceCIDR) // Cast CIDR to normal string and append for _, cidr := range liqonetConfig.ReservedSubnets { reservedSubnets = append(reservedSubnets, string(cidr)) @@ -189,6 +194,7 @@ func (tec *TunnelEndpointCreator) WatchConfiguration(config *rest.Config, gv *sc go utils.WatchConfiguration(func(configuration *configv1alpha1.ClusterConfig) { reservedSubnets := tec.getReservedSubnets(configuration) additionalPools := tec.getAdditionalPools(configuration) + tec.setNetParameters(configuration) err = tec.updateReservedSubnets(reservedSubnets) if err != nil { klog.Error(err) @@ -199,7 +205,6 @@ func (tec *TunnelEndpointCreator) WatchConfiguration(config *rest.Config, gv *sc klog.Error(err) return } - tec.setNetParameters(configuration) if !tec.cfgConfigured { tec.WaitConfig.Done() klog.Infof("called done on waitgroup") diff --git a/internal/liqonet/tunnelEndpointCreator/tunnelEndpointCreator-operator.go b/internal/liqonet/tunnelEndpointCreator/tunnelEndpointCreator-operator.go index 017f393dbb..e3ccfe50b8 100644 --- a/internal/liqonet/tunnelEndpointCreator/tunnelEndpointCreator-operator.go +++ b/internal/liqonet/tunnelEndpointCreator/tunnelEndpointCreator-operator.go @@ -191,6 +191,9 @@ func (tec *TunnelEndpointCreator) Reconcile(ctx context.Context, req ctrl.Reques if err := tec.IPManager.FreeSubnetsPerCluster(netConfig.Spec.ClusterID); err != nil { klog.Errorf("cannot free networks assigned to cluster %s", netConfig.Spec.ClusterID) } + if err := tec.IPManager.RemoveLocalSubnetsPerCluster(netConfig.Spec.ClusterID); err != nil { + klog.Errorf("cannot delete local subnets assigned to cluster %s", netConfig.Spec.ClusterID) + } return result, nil } @@ -222,6 +225,8 @@ func (tec *TunnelEndpointCreator) SetupSignalHandlerForTunEndCreator() context.C go func(r *TunnelEndpointCreator) { sig := <-c klog.Infof("received signal: %s", sig.String()) + // Stop IPAM gRPC server. + tec.IPManager.StopGRPCServer() // closing shared informers close(r.ForeignClusterStopWatcher) done() @@ -472,10 +477,19 @@ func (tec *TunnelEndpointCreator) processLocalNetConfig(netConfig *netv1alpha1.N if !netConfigList.Items[0].Status.Processed { return nil } - // Store ExternalCIDR used in remote cluster - if err := tec.IPManager.AddExternalCIDRPerCluster(netConfig.Status.ExternalCIDRNAT, - netConfig.Spec.ClusterID); err != nil { - klog.Error(err) + + retryError := retry.RetryOnConflict(retry.DefaultRetry, func() error { + // Store subnets used in remote cluster + if err := tec.IPManager.AddLocalSubnetsPerCluster(netConfig.Status.PodCIDRNAT, + netConfig.Status.ExternalCIDRNAT, + netConfig.Spec.ClusterID); err != nil { + return err + } + return nil + }) + if retryError != nil { + klog.Error(retryError) + return retryError } // at this point we have all the necessary parameters to create the tunnelEndpoint resource remoteNetConf := netConfigList.Items[0] diff --git a/pkg/consts/liqonet.go b/pkg/consts/liqonet.go new file mode 100644 index 0000000000..98650520a0 --- /dev/null +++ b/pkg/consts/liqonet.go @@ -0,0 +1,12 @@ +package consts + +const ( + // NetworkManagerIpamPort is the port used by IPAM gRPCs. + NetworkManagerIpamPort = 6000 + // NetworkManagerServiceName is the service name for IPAM gRPCs. + NetworkManagerServiceName = "liqo-network-manager" + // DefaultCIDRValue is the default value for a string that contains a CIDR. + DefaultCIDRValue = "None" + // TepReady is the ready state of TunnelEndpoint resource. + TepReady = "Ready" +) diff --git a/pkg/consts/networkManager.go b/pkg/consts/networkManager.go deleted file mode 100644 index 99755e469a..0000000000 --- a/pkg/consts/networkManager.go +++ /dev/null @@ -1,6 +0,0 @@ -package consts - -const ( - DefaultCIDRValue = "None" - TepReady = "Ready" -) diff --git a/pkg/liqonet/ipam.go b/pkg/liqonet/ipam.go index 5e49bf6a5d..7ff0d8fefb 100644 --- a/pkg/liqonet/ipam.go +++ b/pkg/liqonet/ipam.go @@ -1,9 +1,13 @@ package liqonet import ( + "context" "fmt" + "net" "strings" + grpc "google.golang.org/grpc" + netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1" "k8s.io/client-go/dynamic" @@ -14,48 +18,69 @@ import ( "k8s.io/klog" ) -/* IPAM Interface. */ +// Ipam Interface. type Ipam interface { - // GetSubnetsPerCluster stores and reserves PodCIDR and ExternalCIDR for a remote cluster. + /* GetSubnetsPerCluster receives PodCIDR and ExternalCIDR of a remote cluster and checks if + those networks generate conflicts with other networks(reserved ones or even PodCIDR and + ExternalCIDR of other clusters). If no conflicts are found, networks are reserved so that + they cannot be used by any other cluster. In this way IPAM guarrantees that traffic toward these + networks is directed to only one remote cluster. If conflicts are found, received networks are + ignored and they are substituted with a new network chosen by the IPAM. These new network are + reserved as well. The remapping mechanism can be applied on: + - PodCIDR + - ExternalCIDR + - Both + */ GetSubnetsPerCluster(podCidr, externalCIDR, clusterID string) (string, string, error) - // FreeSubnetsPerCluster deletes and frees PodCIDR and ExternalCIDR for a remote cluster. + // FreeSubnetsPerCluster frees PodCIDR and ExternalCIDR for a remote cluster. FreeSubnetsPerCluster(clusterID string) error - // AcquireReservedSubnet reserves a network + // AcquireReservedSubnet reserves a network. AcquireReservedSubnet(network string) error - // FreeReservedSubnet frees a network + // FreeReservedSubnet frees a network. FreeReservedSubnet(network string) error - // AddNetworkPool adds a network to the set of network pools + // AddNetworkPool adds a network to the set of default network pools. AddNetworkPool(network string) error - // RemoveNetworkPool removes a network from the set of network pools + // RemoveNetworkPool removes a network from the set of network pools. RemoveNetworkPool(network string) error - // AddExternalCIDRPerCluster stores (without reserving) an ExternalCIDR for a remote cluster - AddExternalCIDRPerCluster(network, clusterID string) error - // RemoveExternalCIDRPerCluster deletes an ExternalCIDR for a cluster - RemoveExternalCIDRPerCluster(clusterID string) error - // GetClusterExternalCIDR eventually choose and returns the local cluster's ExternalCIDR - GetClusterExternalCIDR(mask uint8) (string, error) + /* AddLocalSubnetsPerCluster stores the PodCIDR and the ExternalCIDR used in the remote cluster to + map the local cluster subnets. Since those networks are used in the remote cluster + this function must not reserve it. If the remote cluster has not remapped + a local subnet, then CIDR value should be equal to "None". */ + AddLocalSubnetsPerCluster(podCIDR, externalCIDR, clusterID string) error + // RemoveLocalSubnetsPerCluster deletes the subnets related to a remote cluster + RemoveLocalSubnetsPerCluster(clusterID string) error + GetExternalCIDR(mask uint8) (string, error) + // SetPodCIDR sets the cluster PodCIDR. + SetPodCIDR(podCIDR string) error + // SetServiceCIDR sets the cluster ServiceCIDR. + SetServiceCIDR(serviceCIDR string) error + // StopGRPCServer stops the gRPC server gracefully. + StopGRPCServer() + IpamServer } -/* IPAM implementation. */ +// IPAM implementation. type IPAM struct { ipam goipam.Ipamer ipamStorage IpamStorage + grpcServer *grpc.Server + UnimplementedIpamServer } -/* NewIPAM returns a IPAM instance. */ +// NewIPAM returns a IPAM instance. func NewIPAM() *IPAM { return &IPAM{} } -/* Constant slice containing private IPv4 networks. */ +// Pools is a constant slice containing private IPv4 networks. var Pools = []string{ "10.0.0.0/8", "192.168.0.0/16", "172.16.0.0/12", } -/* Init uses the Ipam resource to retrieve and allocate reserved networks. */ -func (liqoIPAM *IPAM) Init(pools []string, dynClient dynamic.Interface) error { +// Init uses the Ipam resource to retrieve and allocate reserved networks. +func (liqoIPAM *IPAM) Init(pools []string, dynClient dynamic.Interface, listeningPort int) error { var err error // Set up storage liqoIPAM.ipamStorage, err = NewIPAMStorage(dynClient) @@ -84,9 +109,36 @@ func (liqoIPAM *IPAM) Init(pools []string, dynClient dynamic.Interface) error { return fmt.Errorf("cannot set pools: %w", err) } } + if listeningPort > 0 { + err = liqoIPAM.initRPCServer(listeningPort) + if err != nil { + return fmt.Errorf("cannot start gRPC server:%w", err) + } + } return nil } +func (liqoIPAM *IPAM) initRPCServer(port int) error { + lis, err := net.Listen("tcp", fmt.Sprintf("%s%d", "0.0.0.0:", port)) + if err != nil { + return err + } + liqoIPAM.grpcServer = grpc.NewServer() + RegisterIpamServer(liqoIPAM.grpcServer, liqoIPAM) + go func() { + err := liqoIPAM.grpcServer.Serve(lis) + if err != nil { + klog.Error(err) + } + }() + return nil +} + +// StopGRPCServer stops the gRPC server gracefully. +func (liqoIPAM *IPAM) StopGRPCServer() { + liqoIPAM.grpcServer.GracefulStop() +} + // reservePoolInHalves handles the special case in which a network pool has to be entirely reserved // Since AcquireSpecificChildPrefix would return an error, reservePoolInHalves acquires the two // halves of the network pool. @@ -107,7 +159,7 @@ func (liqoIPAM *IPAM) reservePoolInHalves(pool string) error { return nil } -/* AcquireReservedNetwork marks as used the network received as parameter. */ +// AcquireReservedSubnet marks as used the network received as parameter. func (liqoIPAM *IPAM) AcquireReservedSubnet(reservedNetwork string) error { klog.Infof("Request to reserve network %s has been received", reservedNetwork) cluster, overlaps, err := liqoIPAM.overlapsWithCluster(reservedNetwork) @@ -144,7 +196,7 @@ func (liqoIPAM *IPAM) overlapsWithNetwork(newNetwork, network string) (overlaps return } if err = liqoIPAM.ipam.PrefixesOverlapping([]string{network}, []string{newNetwork}); err != nil { - //overlaps + // overlaps overlaps = true err = nil return @@ -153,34 +205,25 @@ func (liqoIPAM *IPAM) overlapsWithNetwork(newNetwork, network string) (overlaps } func (liqoIPAM *IPAM) overlapsWithCluster(network string) (overlappingCluster string, overlaps bool, err error) { + var overlapsWithPodCIDR bool + var overlapsWithExternalCIDR bool // Get cluster subnets clusterSubnets, err := liqoIPAM.ipamStorage.getClusterSubnets() if err != nil { err = fmt.Errorf("cannot get Ipam config: %w", err) return } - for cluster, clusterSubnet := range clusterSubnets { - overlaps, err = liqoIPAM.overlapsWithNetwork(network, clusterSubnet.PodCIDR) - if err != nil { - return - } - if overlaps { - overlappingCluster = cluster - return - } - overlaps, err = liqoIPAM.overlapsWithNetwork(network, clusterSubnet.LocalExternalCIDR) + for cluster, subnets := range clusterSubnets { + overlapsWithPodCIDR, err = liqoIPAM.overlapsWithNetwork(network, subnets.RemotePodCIDR) if err != nil { return } - if overlaps { - overlappingCluster = cluster - return - } - overlaps, err = liqoIPAM.overlapsWithNetwork(network, clusterSubnet.RemoteExternalCIDR) + overlapsWithExternalCIDR, err = liqoIPAM.overlapsWithNetwork(network, subnets.RemoteExternalCIDR) if err != nil { return } - if overlaps { + if overlapsWithPodCIDR || overlapsWithExternalCIDR { + overlaps = true overlappingCluster = cluster return } @@ -208,7 +251,7 @@ func (liqoIPAM *IPAM) overlapsWithPool(network string) (overlappingPool string, return } -/* Function that receives a network as parameter and returns the pool to which this network belongs to. */ +// Function that receives a network as parameter and returns the pool to which this network belongs to. func (liqoIPAM *IPAM) getPoolFromNetwork(network string) (networkPool string, success bool, err error) { var poolIPset netaddr.IPSetBuilder var c netaddr.IPPrefix @@ -324,8 +367,8 @@ func (liqoIPAM *IPAM) GetSubnetsPerCluster( // Check existence subnets, exists := clusterSubnets[clusterID] - if exists && subnets.PodCIDR != "" && subnets.LocalExternalCIDR != "" { - return subnets.PodCIDR, subnets.LocalExternalCIDR, nil + if exists && subnets.RemotePodCIDR != "" && subnets.RemoteExternalCIDR != "" { + return subnets.RemotePodCIDR, subnets.RemoteExternalCIDR, nil } // Check if podCidr is a valid CIDR @@ -362,14 +405,15 @@ func (liqoIPAM *IPAM) GetSubnetsPerCluster( if !exists { // Create cluster network configuration subnets = netv1alpha1.Subnets{ - PodCIDR: mappedPodCIDR, - LocalExternalCIDR: mappedExternalCIDR, - RemoteExternalCIDR: "", + LocalNATPodCIDR: "", + RemotePodCIDR: mappedPodCIDR, + RemoteExternalCIDR: mappedExternalCIDR, + LocalNATExternalCIDR: "", } } else { // Update cluster network configuration - subnets.PodCIDR = mappedPodCIDR - subnets.LocalExternalCIDR = mappedExternalCIDR + subnets.RemotePodCIDR = mappedPodCIDR + subnets.RemoteExternalCIDR = mappedExternalCIDR } clusterSubnets[clusterID] = subnets @@ -430,7 +474,7 @@ func (liqoIPAM *IPAM) freePoolInHalves(pool string) error { return nil } -/* FreeReservedSubnet marks as free a reserved subnet. */ +// FreeReservedSubnet marks as free a reserved subnet. func (liqoIPAM *IPAM) FreeReservedSubnet(network string) error { var p *goipam.Prefix @@ -468,7 +512,10 @@ func (liqoIPAM *IPAM) eventuallyDeleteClusterSubnet(clusterID string, subnets := clusterSubnets[clusterID] // Check is all field are the empty string - if subnets.PodCIDR == "" && subnets.LocalExternalCIDR == "" && subnets.RemoteExternalCIDR == "" { + if subnets.RemotePodCIDR == "" && + subnets.LocalNATPodCIDR == "" && + subnets.RemoteExternalCIDR == "" && + subnets.LocalNATExternalCIDR == "" { // Delete entry delete(clusterSubnets, clusterID) } @@ -479,7 +526,7 @@ func (liqoIPAM *IPAM) eventuallyDeleteClusterSubnet(clusterID string, return nil } -/* FreeSubnetPerCluster marks as free the network previously allocated for cluster clusterid. */ +// FreeSubnetsPerCluster marks as free the network previously allocated for cluster clusterID. func (liqoIPAM *IPAM) FreeSubnetsPerCluster(clusterID string) error { var subnets netv1alpha1.Subnets var exists bool @@ -489,21 +536,21 @@ func (liqoIPAM *IPAM) FreeSubnetsPerCluster(clusterID string) error { return fmt.Errorf("cannot get cluster subnets: %w", err) } subnets, exists = clusterSubnets[clusterID] - if !exists || subnets.PodCIDR == "" || subnets.LocalExternalCIDR == "" { - //Networks do not exist + if !exists || subnets.RemotePodCIDR == "" || subnets.RemoteExternalCIDR == "" { + // Networks do not exist return nil } // Free PodCidr - if err := liqoIPAM.FreeReservedSubnet(subnets.PodCIDR); err != nil { + if err := liqoIPAM.FreeReservedSubnet(subnets.RemotePodCIDR); err != nil { return err } - subnets.PodCIDR = "" + subnets.RemotePodCIDR = "" // Free ExternalCidr - if err := liqoIPAM.FreeReservedSubnet(subnets.LocalExternalCIDR); err != nil { + if err := liqoIPAM.FreeReservedSubnet(subnets.RemoteExternalCIDR); err != nil { return err } - subnets.LocalExternalCIDR = "" + subnets.RemoteExternalCIDR = "" clusterSubnets[clusterID] = subnets @@ -611,9 +658,9 @@ func (liqoIPAM *IPAM) RemoveNetworkPool(network string) error { return nil } -// AddExternalCIDRPerCluster stores (without reserving) an ExternalCIDR for a remote cluster -// since this network is used in the remote cluster. -func (liqoIPAM *IPAM) AddExternalCIDRPerCluster(network, clusterID string) error { +// AddLocalSubnetsPerCluster stores how the PodCIDR and the ExternalCIDR of local cluster +// has been remapped in a remote cluster. If no remapping happened, then the CIDR value should be equal to "None". +func (liqoIPAM *IPAM) AddLocalSubnetsPerCluster(podCIDR, externalCIDR, clusterID string) error { var exists bool var subnets netv1alpha1.Subnets // Get cluster subnets @@ -623,22 +670,25 @@ func (liqoIPAM *IPAM) AddExternalCIDRPerCluster(network, clusterID string) error } // Check existence subnets, exists = clusterSubnets[clusterID] - if exists && subnets.RemoteExternalCIDR != "" { + if exists && subnets.LocalNATPodCIDR != "" && subnets.LocalNATExternalCIDR != "" { return nil } - // Set remote ExternalCIDR + // Set networks if exists { - subnets.RemoteExternalCIDR = network + subnets.LocalNATPodCIDR = podCIDR + subnets.LocalNATExternalCIDR = externalCIDR } else { subnets = netv1alpha1.Subnets{ - PodCIDR: "", - LocalExternalCIDR: "", - RemoteExternalCIDR: network, + LocalNATPodCIDR: podCIDR, + RemotePodCIDR: "", + LocalNATExternalCIDR: externalCIDR, + RemoteExternalCIDR: "", } } clusterSubnets[clusterID] = subnets - klog.Infof("Remote ExternalCIDR of cluster %s set to %s", clusterID, network) + klog.Infof("Local NAT PodCIDR of cluster %s set to %s", clusterID, podCIDR) + klog.Infof("Local NAT ExternalCIDR of cluster %s set to %s", clusterID, externalCIDR) // Push it in clusterSubnets if err := liqoIPAM.ipamStorage.updateClusterSubnets(clusterSubnets); err != nil { return fmt.Errorf("cannot update cluster subnets:%w", err) @@ -646,8 +696,8 @@ func (liqoIPAM *IPAM) AddExternalCIDRPerCluster(network, clusterID string) error return nil } -// RemoveExternalCIDRPerCluster deletes an ExternalCIDR for a cluster. -func (liqoIPAM *IPAM) RemoveExternalCIDRPerCluster(clusterID string) error { +// RemoveLocalSubnetsPerCluster deletes networks related to a cluster. +func (liqoIPAM *IPAM) RemoveLocalSubnetsPerCluster(clusterID string) error { var exists bool var subnets netv1alpha1.Subnets // Get cluster subnets @@ -657,23 +707,24 @@ func (liqoIPAM *IPAM) RemoveExternalCIDRPerCluster(clusterID string) error { } // Check existence subnets, exists = clusterSubnets[clusterID] - if !exists || subnets.RemoteExternalCIDR == "" { + if !exists || (subnets.LocalNATPodCIDR == "" && subnets.LocalNATExternalCIDR == "") { return nil } - // Unset remote ExternalCIDR - subnets.RemoteExternalCIDR = "" + // Unset networks + subnets.LocalNATPodCIDR = "" + subnets.LocalNATExternalCIDR = "" clusterSubnets[clusterID] = subnets - klog.Infof("Remote ExternalCIDR of cluster %s deleted", clusterID) + klog.Infof("Local NAT networks of cluster %s deleted", clusterID) if err := liqoIPAM.eventuallyDeleteClusterSubnet(clusterID, clusterSubnets); err != nil { return err } return nil } -// GetClusterExternalCIDR eventually choose and returns the local cluster's ExternalCIDR. -func (liqoIPAM *IPAM) GetClusterExternalCIDR(mask uint8) (string, error) { +// GetExternalCIDR chooses and returns the local cluster's ExternalCIDR. +func (liqoIPAM *IPAM) GetExternalCIDR(mask uint8) (string, error) { var externalCIDR string var err error // Get cluster ExternalCIDR @@ -693,3 +744,252 @@ func (liqoIPAM *IPAM) GetClusterExternalCIDR(mask uint8) (string, error) { } return externalCIDR, nil } + +// ipBelongsToPodCIDR returns true if the received IP address belongs to the local PodCIDR, returns false otherwise. +func (liqoIPAM *IPAM) ipBelongsToPodCIDR(ip string) (bool, error) { + // Get PodCIDR + podCIDR, err := liqoIPAM.ipamStorage.getPodCIDR() + if err != nil { + return false, fmt.Errorf("cannot get cluster PodCIDR: %w", err) + } + if podCIDR == "" { + return false, fmt.Errorf("cluster PodCIDR is not set") + } + // Parse PodCIDR to netaddr format + p, err := netaddr.ParseIPPrefix(podCIDR) + if err != nil { + return false, fmt.Errorf("cannot parse PodCIDR:%w", err) + } + return p.Contains(netaddr.MustParseIP(ip)), nil +} + +/* mapIPToExternalCIDR acquires an IP belonging to the local ExternalCIDR for the specific IP and +if necessary maps it using the remoteExternalCIDR (this means remote cluster has remapped local ExternalCIDR) +Further invocations passing the same IP won't acquire a new IP, but will use the one already acquired. */ +func (liqoIPAM *IPAM) mapIPToExternalCIDR(clusterID, remoteExternalCIDR, ip string) (string, error) { + var externalCIDR string + // Get endpointMappings + endpointMappings, err := liqoIPAM.ipamStorage.getEndpointMappings() + if err != nil { + return "", fmt.Errorf("cannot get Endpoint IPs: %w", err) + } + + // Get local ExternalCIDR + localExternalCIDR, err := liqoIPAM.ipamStorage.getExternalCIDR() + if err != nil { + return "", fmt.Errorf("cannot get ExternalCIDR: %w", err) + } + + if remoteExternalCIDR == "None" { + externalCIDR = localExternalCIDR + } else { + externalCIDR = remoteExternalCIDR + } + + // Check entry existence + endpointMapping, exists := endpointMappings[ip] + if !exists { + // Acquire IP + ipamIP, err := liqoIPAM.ipam.AcquireIP(localExternalCIDR) + if err != nil { + return "", fmt.Errorf("cannot allocate a new IP for endpoint %s:%w", ip, err) + } + klog.Infof("IP %s has been acquired for endpoint %s", ipamIP.IP.String(), ip) + // Create new entry + entry := netv1alpha1.EndpointMapping{ + IP: ipamIP.IP.String(), + ClusterMappings: make(map[string]netv1alpha1.ClusterMapping), + } + endpointMapping = entry + endpointMappings[ip] = entry + } + + // Update clusterMappings + endpointMapping.ClusterMappings[clusterID] = netv1alpha1.ClusterMapping{} + endpointMappings[ip] = endpointMapping + + // Update endpointMappings + if err := liqoIPAM.ipamStorage.updateEndpointMappings(endpointMappings); err != nil { + return "", fmt.Errorf("cannot update endpointMappings:%w", err) + } + + // Map IP if remote cluster has remapped local ExternalCIDR + newIP, err := MapIPToNetwork(externalCIDR, endpointMapping.IP) + if err != nil { + return "", fmt.Errorf("cannot map endpoint IP %s to ExternalCIDR:%w", endpointMapping.IP, err) + } + return newIP, nil +} + +/* mapEndpointIPInternal is the internal implementation of MapEndpointIP gRPC. +If the received IP belongs to local PodCIDR, then it maps the address in the traditional way, +i.e. using the network used in the remote cluster for local PodCIDR. +If the received IP does not belong to local PodCIDR, then it maps the address using the ExternalCIDR.*/ +func (liqoIPAM *IPAM) mapEndpointIPInternal(clusterID, ip string) (string, error) { + var subnets netv1alpha1.Subnets + var exists bool + + // Parse IP + if netIP := net.ParseIP(ip); netIP == nil { + return "", fmt.Errorf("cannot parse endpointIP %s", ip) + } + + // Get cluster subnets + clusterSubnets, err := liqoIPAM.ipamStorage.getClusterSubnets() + if err != nil { + return "", fmt.Errorf("cannot get cluster subnets:%w", err) + } + subnets, exists = clusterSubnets[clusterID] + + belongs, err := liqoIPAM.ipBelongsToPodCIDR(ip) + if err != nil { + return "", fmt.Errorf("cannot establish if IP %s belongs to PodCIDR:%w", ip, err) + } + if belongs { + // Check existence of local NAT PodCIDR + if !exists || subnets.LocalNATPodCIDR == "" { + return "", fmt.Errorf("remote cluster %s has not a local NAT PodCIDR", clusterID) + } + /* IP belongs to local PodCIDR, this means the Pod is a local Pod and + the new IP should belong to the network used in the remote cluster + for local Pods: this can be either the cluster PodCIDR or a different network */ + newIP, err := MapIPToNetwork(subnets.LocalNATPodCIDR, ip) + if err != nil { + return "", fmt.Errorf("cannot map endpoint IP %s to PodCIDR of remote cluster %s:%w", ip, clusterID, err) + } + return newIP, nil + } + // IP does not belong to cluster PodCIDR: Pod is a reflected Pod + + // Check existence of RemoteExternalCIDR + if !exists || subnets.LocalNATExternalCIDR == "" { + return "", fmt.Errorf("remote cluster %s has not a remote ExternalCIDR:%w", clusterID, err) + } + + // Map IP to ExternalCIDR + newIP, err := liqoIPAM.mapIPToExternalCIDR(clusterID, subnets.LocalNATExternalCIDR, ip) + if err != nil { + return "", fmt.Errorf("cannot map endpoint IP %s to ExternalCIDR of cluster %s:%w", ip, clusterID, err) + } + + return newIP, nil +} + +// MapEndpointIP receives a service endpoint IP and a cluster identifier and, +// if the endpoint IP does not belong to cluster PodCIDR, maps +// the endpoint IP to a new IP taken from the remote ExternalCIDR of the remote cluster. +func (liqoIPAM *IPAM) MapEndpointIP(ctx context.Context, mapRequest *MapRequest) (*MapResponse, error) { + mappedIP, err := liqoIPAM.mapEndpointIPInternal(mapRequest.GetClusterID(), mapRequest.GetIp()) + if err != nil { + return &MapResponse{}, fmt.Errorf("cannot map endpoint IP to ExternalCIDR of cluster %s, %w", + mapRequest.GetClusterID(), err) + } + return &MapResponse{Ip: mappedIP}, nil +} + +// unmapEndpointIPInternal is the internal implementation of UnmapEndpointIP. +// If the endpointIP is not reflected anymore in any remote cluster, then it frees the corresponding ExternalCIDR IP. +func (liqoIPAM *IPAM) unmapEndpointIPInternal(clusterID, endpointIP string) error { + var exists bool + + // Get endpointMappings + endpointMappings, err := liqoIPAM.ipamStorage.getEndpointMappings() + if err != nil { + return fmt.Errorf("cannot get Endpoint IPs: %w", err) + } + + // Get local ExternalCIDR + localExternalCIDR, err := liqoIPAM.ipamStorage.getExternalCIDR() + if err != nil { + return fmt.Errorf("cannot get ExternalCIDR: %w", err) + } + + endpointMapping, exists := endpointMappings[endpointIP] + if !exists { + // a. the entry does not exists because the endpointIP is an IP + // belonging to the local PodCIDR, therefore there is no need of do nothing. + // b. the entry does not exists because it was already deleted, same as above. + return nil + } + + // Set endpoint IP as unused by deleting entry of cluster + delete(endpointMapping.ClusterMappings, clusterID) + + if len(endpointMapping.ClusterMappings) == 0 { + // There are no more clusters using this endpoint IP + // Free IP + if err := liqoIPAM.ipam.ReleaseIPFromPrefix(localExternalCIDR, endpointMappings[endpointIP].IP); err != nil { + return fmt.Errorf("cannot free IP: %w", err) + } + klog.Infof("IP %s (mapped from %s) has been freed", endpointMappings[endpointIP].IP, endpointIP) + // Delete entry + delete(endpointMappings, endpointIP) + } + + // Push update + if err := liqoIPAM.ipamStorage.updateEndpointMappings(endpointMappings); err != nil { + return fmt.Errorf("cannot update endpointIPs:%w", err) + } + return nil +} + +// UnmapEndpointIP set the endpoint as unused for a specific cluster. +func (liqoIPAM *IPAM) UnmapEndpointIP(ctx context.Context, unmapRequest *UnmapRequest) (*UnmapResponse, error) { + err := liqoIPAM.unmapEndpointIPInternal(unmapRequest.GetClusterID(), unmapRequest.GetIp()) + if err != nil { + return &UnmapResponse{}, fmt.Errorf("cannot unmap the IP of endpoint %s:%w", unmapRequest.GetIp(), err) + } + return &UnmapResponse{}, nil +} + +// SetPodCIDR sets the PodCIDR. +func (liqoIPAM *IPAM) SetPodCIDR(podCIDR string) error { + var oldPodCIDR string + var err error + // Get PodCIDR + oldPodCIDR, err = liqoIPAM.ipamStorage.getPodCIDR() + if err != nil { + return fmt.Errorf("cannot get PodCIDR:%w", err) + } + if oldPodCIDR != "" && oldPodCIDR != podCIDR { + return fmt.Errorf("trying to change PodCIDR") + } + if oldPodCIDR != "" && oldPodCIDR == podCIDR { + return nil + } + // Acquire PodCIDR + if err := liqoIPAM.AcquireReservedSubnet(podCIDR); err != nil { + return fmt.Errorf("cannot acquire PodCIDR:%w", err) + } + // Update PodCIDR + if err := liqoIPAM.ipamStorage.updatePodCIDR(podCIDR); err != nil { + return fmt.Errorf("cannot set PodCIDR:%w", err) + } + return nil +} + +// SetServiceCIDR sets the ServiceCIDR. +func (liqoIPAM *IPAM) SetServiceCIDR(serviceCIDR string) error { + var oldServiceCIDR string + var err error + // Get ServiceCIDR + oldServiceCIDR, err = liqoIPAM.ipamStorage.getServiceCIDR() + if err != nil { + return fmt.Errorf("cannot get ServiceCIDR:%w", err) + } + if oldServiceCIDR != "" && oldServiceCIDR != serviceCIDR { + return fmt.Errorf("trying to change ServiceCIDR") + } + if oldServiceCIDR != "" && oldServiceCIDR == serviceCIDR { + return nil + } + // Acquire Service CIDR + if err := liqoIPAM.AcquireReservedSubnet(serviceCIDR); err != nil { + return fmt.Errorf("cannot acquire ServiceCIDR:%w", err) + } + // Update Service CIDR + if err := liqoIPAM.ipamStorage.updateServiceCIDR(serviceCIDR); err != nil { + return fmt.Errorf("cannot set ServiceCIDR:%w", err) + } + return nil +} diff --git a/pkg/liqonet/ipam.pb.go b/pkg/liqonet/ipam.pb.go new file mode 100644 index 0000000000..6cc43e1717 --- /dev/null +++ b/pkg/liqonet/ipam.pb.go @@ -0,0 +1,348 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.15.5 +// source: pkg/liqonet/ipam.proto + +package liqonet + +import ( + reflect "reflect" + sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type MapRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ClusterID string `protobuf:"bytes,1,opt,name=clusterID,proto3" json:"clusterID,omitempty"` + Ip string `protobuf:"bytes,2,opt,name=ip,proto3" json:"ip,omitempty"` +} + +func (x *MapRequest) Reset() { + *x = MapRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_liqonet_ipam_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MapRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MapRequest) ProtoMessage() {} + +func (x *MapRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkg_liqonet_ipam_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MapRequest.ProtoReflect.Descriptor instead. +func (*MapRequest) Descriptor() ([]byte, []int) { + return file_pkg_liqonet_ipam_proto_rawDescGZIP(), []int{0} +} + +func (x *MapRequest) GetClusterID() string { + if x != nil { + return x.ClusterID + } + return "" +} + +func (x *MapRequest) GetIp() string { + if x != nil { + return x.Ip + } + return "" +} + +type MapResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Ip string `protobuf:"bytes,1,opt,name=ip,proto3" json:"ip,omitempty"` +} + +func (x *MapResponse) Reset() { + *x = MapResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_liqonet_ipam_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MapResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MapResponse) ProtoMessage() {} + +func (x *MapResponse) ProtoReflect() protoreflect.Message { + mi := &file_pkg_liqonet_ipam_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MapResponse.ProtoReflect.Descriptor instead. +func (*MapResponse) Descriptor() ([]byte, []int) { + return file_pkg_liqonet_ipam_proto_rawDescGZIP(), []int{1} +} + +func (x *MapResponse) GetIp() string { + if x != nil { + return x.Ip + } + return "" +} + +type UnmapRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ClusterID string `protobuf:"bytes,1,opt,name=clusterID,proto3" json:"clusterID,omitempty"` + Ip string `protobuf:"bytes,2,opt,name=ip,proto3" json:"ip,omitempty"` +} + +func (x *UnmapRequest) Reset() { + *x = UnmapRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_liqonet_ipam_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UnmapRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UnmapRequest) ProtoMessage() {} + +func (x *UnmapRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkg_liqonet_ipam_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UnmapRequest.ProtoReflect.Descriptor instead. +func (*UnmapRequest) Descriptor() ([]byte, []int) { + return file_pkg_liqonet_ipam_proto_rawDescGZIP(), []int{2} +} + +func (x *UnmapRequest) GetClusterID() string { + if x != nil { + return x.ClusterID + } + return "" +} + +func (x *UnmapRequest) GetIp() string { + if x != nil { + return x.Ip + } + return "" +} + +type UnmapResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *UnmapResponse) Reset() { + *x = UnmapResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_liqonet_ipam_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UnmapResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UnmapResponse) ProtoMessage() {} + +func (x *UnmapResponse) ProtoReflect() protoreflect.Message { + mi := &file_pkg_liqonet_ipam_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UnmapResponse.ProtoReflect.Descriptor instead. +func (*UnmapResponse) Descriptor() ([]byte, []int) { + return file_pkg_liqonet_ipam_proto_rawDescGZIP(), []int{3} +} + +var File_pkg_liqonet_ipam_proto protoreflect.FileDescriptor + +var file_pkg_liqonet_ipam_proto_rawDesc = []byte{ + 0x0a, 0x16, 0x70, 0x6b, 0x67, 0x2f, 0x6c, 0x69, 0x71, 0x6f, 0x6e, 0x65, 0x74, 0x2f, 0x69, 0x70, + 0x61, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x3a, 0x0a, 0x0a, 0x4d, 0x61, 0x70, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, + 0x72, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x02, 0x69, 0x70, 0x22, 0x1d, 0x0a, 0x0b, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x02, 0x69, 0x70, 0x22, 0x3c, 0x0a, 0x0c, 0x55, 0x6e, 0x6d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x44, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, + 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, + 0x70, 0x22, 0x0f, 0x0a, 0x0d, 0x55, 0x6e, 0x6d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x32, 0x64, 0x0a, 0x04, 0x69, 0x70, 0x61, 0x6d, 0x12, 0x2a, 0x0a, 0x0d, 0x4d, 0x61, + 0x70, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x49, 0x50, 0x12, 0x0b, 0x2e, 0x4d, 0x61, + 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x30, 0x0a, 0x0f, 0x55, 0x6e, 0x6d, 0x61, 0x70, 0x45, + 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x49, 0x50, 0x12, 0x0d, 0x2e, 0x55, 0x6e, 0x6d, 0x61, + 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0e, 0x2e, 0x55, 0x6e, 0x6d, 0x61, 0x70, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x6c, 0x69, + 0x71, 0x6f, 0x6e, 0x65, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_pkg_liqonet_ipam_proto_rawDescOnce sync.Once + file_pkg_liqonet_ipam_proto_rawDescData = file_pkg_liqonet_ipam_proto_rawDesc +) + +func file_pkg_liqonet_ipam_proto_rawDescGZIP() []byte { + file_pkg_liqonet_ipam_proto_rawDescOnce.Do(func() { + file_pkg_liqonet_ipam_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_liqonet_ipam_proto_rawDescData) + }) + return file_pkg_liqonet_ipam_proto_rawDescData +} + +var file_pkg_liqonet_ipam_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_pkg_liqonet_ipam_proto_goTypes = []interface{}{ + (*MapRequest)(nil), // 0: MapRequest + (*MapResponse)(nil), // 1: MapResponse + (*UnmapRequest)(nil), // 2: UnmapRequest + (*UnmapResponse)(nil), // 3: UnmapResponse +} +var file_pkg_liqonet_ipam_proto_depIdxs = []int32{ + 0, // 0: ipam.MapEndpointIP:input_type -> MapRequest + 2, // 1: ipam.UnmapEndpointIP:input_type -> UnmapRequest + 1, // 2: ipam.MapEndpointIP:output_type -> MapResponse + 3, // 3: ipam.UnmapEndpointIP:output_type -> UnmapResponse + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_pkg_liqonet_ipam_proto_init() } +func file_pkg_liqonet_ipam_proto_init() { + if File_pkg_liqonet_ipam_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_pkg_liqonet_ipam_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MapRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_liqonet_ipam_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MapResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_liqonet_ipam_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*UnmapRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_liqonet_ipam_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*UnmapResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pkg_liqonet_ipam_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_pkg_liqonet_ipam_proto_goTypes, + DependencyIndexes: file_pkg_liqonet_ipam_proto_depIdxs, + MessageInfos: file_pkg_liqonet_ipam_proto_msgTypes, + }.Build() + File_pkg_liqonet_ipam_proto = out.File + file_pkg_liqonet_ipam_proto_rawDesc = nil + file_pkg_liqonet_ipam_proto_goTypes = nil + file_pkg_liqonet_ipam_proto_depIdxs = nil +} diff --git a/pkg/liqonet/ipam.proto b/pkg/liqonet/ipam.proto new file mode 100644 index 0000000000..7ec88b3d13 --- /dev/null +++ b/pkg/liqonet/ipam.proto @@ -0,0 +1,24 @@ +syntax="proto3"; +option go_package = "./liqonet"; + +service ipam { + rpc MapEndpointIP (MapRequest) returns (MapResponse); + rpc UnmapEndpointIP (UnmapRequest) returns (UnmapResponse); +} + +message MapRequest { + string clusterID = 1; + string ip = 2; +} + +message MapResponse { + string ip = 1; +} + +message UnmapRequest { + string clusterID = 1; + string ip = 2; +} + +message UnmapResponse {} + diff --git a/pkg/liqonet/ipamStorage.go b/pkg/liqonet/ipamStorage.go index e2a6482b0b..b0e0dab2ed 100644 --- a/pkg/liqonet/ipamStorage.go +++ b/pkg/liqonet/ipamStorage.go @@ -18,20 +18,31 @@ import ( netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1" ) -const ipamNamePrefix = "ipamstorage-" -const clusterSubnetUpdate = "clusterSubnets" -const poolsUpdate = "pools" -const prefixesUpdate = "prefixes" -const externalCIDRUpdate = "externalCIDR" +const ( + ipamNamePrefix = "ipamstorage-" + clusterSubnetUpdate = "clusterSubnets" + poolsUpdate = "pools" + prefixesUpdate = "prefixes" + externalCIDRUpdate = "externalCIDR" + endpointMappingsUpdate = "endpointMappings" + podCIDRUpdate = "podCIDR" + serviceCIDRUpdate = "serviceCIDR" +) // IpamStorage is the interface to be implemented to enforce persistency in IPAM. type IpamStorage interface { updateClusterSubnets(clusterSubnet map[string]netv1alpha1.Subnets) error updatePools(pools []string) error updateExternalCIDR(externalCIDR string) error + updateEndpointMappings(endpoints map[string]netv1alpha1.EndpointMapping) error + updatePodCIDR(podCIDR string) error + updateServiceCIDR(serviceCIDR string) error getClusterSubnets() (map[string]netv1alpha1.Subnets, error) getPools() ([]string, error) getExternalCIDR() (string, error) + getEndpointMappings() (map[string]netv1alpha1.EndpointMapping, error) + getPodCIDR() (string, error) + getServiceCIDR() (string, error) goipam.Storage } @@ -41,7 +52,8 @@ type IPAMStorage struct { resourceName string } -// NewIPAMStorage inits the storage of the IPAM module, retrieving an existing ipamStorage resource or creating a new one. +// NewIPAMStorage inits the storage of the IPAM module, +// retrieving an existing ipamStorage resource or creating a new one. func NewIPAMStorage(dynClient dynamic.Interface) (*IPAMStorage, error) { klog.Infof("Init IPAM storage..") ipamStorage := &IPAMStorage{} @@ -63,9 +75,10 @@ func NewIPAMStorage(dynClient dynamic.Interface) (*IPAMStorage, error) { Labels: map[string]string{"net.liqo.io/ipamstorage": "true"}, }, Spec: netv1alpha1.IpamSpec{ - Prefixes: make(map[string][]byte), - Pools: make([]string, 0), - ClusterSubnets: make(map[string]netv1alpha1.Subnets), + Prefixes: make(map[string][]byte), + Pools: make([]string, 0), + ClusterSubnets: make(map[string]netv1alpha1.Subnets), + EndpointMappings: make(map[string]netv1alpha1.EndpointMapping), }, } unstructuredIpam, err := runtime.DefaultUnstructuredConverter.ToUnstructured(ipam) @@ -220,6 +233,15 @@ func (ipamStorage *IPAMStorage) updatePrefixes(prefixes map[string][]byte) error func (ipamStorage *IPAMStorage) updateExternalCIDR(externalCIDR string) error { return ipamStorage.updateConfig(externalCIDRUpdate, externalCIDR) } +func (ipamStorage *IPAMStorage) updateEndpointMappings(endpoints map[string]netv1alpha1.EndpointMapping) error { + return ipamStorage.updateConfig(endpointMappingsUpdate, endpoints) +} +func (ipamStorage *IPAMStorage) updatePodCIDR(podCIDR string) error { + return ipamStorage.updateConfig(podCIDRUpdate, podCIDR) +} +func (ipamStorage *IPAMStorage) updateServiceCIDR(serviceCIDR string) error { + return ipamStorage.updateConfig(serviceCIDRUpdate, serviceCIDR) +} func (ipamStorage *IPAMStorage) updateConfig(updateType string, data interface{}) error { jsonData, err := json.Marshal(data) @@ -271,6 +293,27 @@ func (ipamStorage *IPAMStorage) getExternalCIDR() (string, error) { } return ipam.Spec.ExternalCIDR, nil } +func (ipamStorage *IPAMStorage) getEndpointMappings() (map[string]netv1alpha1.EndpointMapping, error) { + ipam, err := ipamStorage.getConfig() + if err != nil { + return nil, err + } + return ipam.Spec.EndpointMappings, nil +} +func (ipamStorage *IPAMStorage) getPodCIDR() (string, error) { + ipam, err := ipamStorage.getConfig() + if err != nil { + return "", err + } + return ipam.Spec.PodCIDR, nil +} +func (ipamStorage *IPAMStorage) getServiceCIDR() (string, error) { + ipam, err := ipamStorage.getConfig() + if err != nil { + return "", err + } + return ipam.Spec.ServiceCIDR, nil +} func (ipamStorage *IPAMStorage) getConfig() (*netv1alpha1.IpamStorage, error) { res := &netv1alpha1.IpamStorage{} diff --git a/pkg/liqonet/ipam_grpc.pb.go b/pkg/liqonet/ipam_grpc.pb.go new file mode 100644 index 0000000000..e5091dd70d --- /dev/null +++ b/pkg/liqonet/ipam_grpc.pb.go @@ -0,0 +1,138 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package liqonet + +import ( + context "context" + + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// IpamClient is the client API for Ipam service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type IpamClient interface { + MapEndpointIP(ctx context.Context, in *MapRequest, opts ...grpc.CallOption) (*MapResponse, error) + UnmapEndpointIP(ctx context.Context, in *UnmapRequest, opts ...grpc.CallOption) (*UnmapResponse, error) +} + +type ipamClient struct { + cc grpc.ClientConnInterface +} + +func NewIpamClient(cc grpc.ClientConnInterface) IpamClient { + return &ipamClient{cc} +} + +func (c *ipamClient) MapEndpointIP(ctx context.Context, in *MapRequest, opts ...grpc.CallOption) (*MapResponse, error) { + out := new(MapResponse) + err := c.cc.Invoke(ctx, "/ipam/MapEndpointIP", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *ipamClient) UnmapEndpointIP(ctx context.Context, in *UnmapRequest, opts ...grpc.CallOption) (*UnmapResponse, error) { + out := new(UnmapResponse) + err := c.cc.Invoke(ctx, "/ipam/UnmapEndpointIP", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// IpamServer is the server API for Ipam service. +// All implementations must embed UnimplementedIpamServer +// for forward compatibility +type IpamServer interface { + MapEndpointIP(context.Context, *MapRequest) (*MapResponse, error) + UnmapEndpointIP(context.Context, *UnmapRequest) (*UnmapResponse, error) + mustEmbedUnimplementedIpamServer() +} + +// UnimplementedIpamServer must be embedded to have forward compatible implementations. +type UnimplementedIpamServer struct { +} + +func (UnimplementedIpamServer) MapEndpointIP(context.Context, *MapRequest) (*MapResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method MapEndpointIP not implemented") +} +func (UnimplementedIpamServer) UnmapEndpointIP(context.Context, *UnmapRequest) (*UnmapResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method UnmapEndpointIP not implemented") +} +func (UnimplementedIpamServer) mustEmbedUnimplementedIpamServer() {} + +// UnsafeIpamServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to IpamServer will +// result in compilation errors. +type UnsafeIpamServer interface { + mustEmbedUnimplementedIpamServer() +} + +func RegisterIpamServer(s grpc.ServiceRegistrar, srv IpamServer) { + s.RegisterService(&Ipam_ServiceDesc, srv) +} + +func _Ipam_MapEndpointIP_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(MapRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(IpamServer).MapEndpointIP(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/ipam/MapEndpointIP", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(IpamServer).MapEndpointIP(ctx, req.(*MapRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Ipam_UnmapEndpointIP_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(UnmapRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(IpamServer).UnmapEndpointIP(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/ipam/UnmapEndpointIP", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(IpamServer).UnmapEndpointIP(ctx, req.(*UnmapRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Ipam_ServiceDesc is the grpc.ServiceDesc for Ipam service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Ipam_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "ipam", + HandlerType: (*IpamServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "MapEndpointIP", + Handler: _Ipam_MapEndpointIP_Handler, + }, + { + MethodName: "UnmapEndpointIP", + Handler: _Ipam_UnmapEndpointIP_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "pkg/liqonet/ipam.proto", +} diff --git a/pkg/liqonet/ipam_test.go b/pkg/liqonet/ipam_test.go index 721821013c..ed5ed5846c 100644 --- a/pkg/liqonet/ipam_test.go +++ b/pkg/liqonet/ipam_test.go @@ -1,8 +1,14 @@ package liqonet_test import ( + "context" + "fmt" + "strings" + + "math/rand" + . "github.com/onsi/ginkgo" - "github.com/onsi/gomega" + . "github.com/onsi/gomega" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" @@ -44,6 +50,7 @@ func fillNetworkPool(pool string, ipam *liqonet.IPAM) error { } var _ = Describe("Ipam", func() { + rand.Seed(1) BeforeEach(func() { scheme := runtime.NewScheme() @@ -61,8 +68,11 @@ var _ = Describe("Ipam", func() { m[s] = "ipamstoragesList" dynClient = fake.NewSimpleDynamicClientWithCustomListKinds(scheme, m, &liqonetapi.IpamStorage{}) ipam = liqonet.NewIPAM() - err := ipam.Init(liqonet.Pools, dynClient) - gomega.Expect(err).To(gomega.BeNil()) + err := ipam.Init(liqonet.Pools, dynClient, 2000+rand.Intn(2000)) + Expect(err).To(BeNil()) + }) + AfterEach(func() { + ipam.StopGRPCServer() }) Describe("AcquireReservedSubnet", func() { @@ -70,38 +80,38 @@ var _ = Describe("Ipam", func() { It("Should successfully reserve the subnet", func() { // Reserve network err := ipam.AcquireReservedSubnet("10.0.0.0/8") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // Try to get a cluster network in that pool p, _, err := ipam.GetSubnetsPerCluster("10.0.2.0/24", "192.168.0.0/24", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // p should have been mapped to a new network belonging to a different pool - gomega.Expect(p).ToNot(gomega.HavePrefix("10.")) + Expect(p).ToNot(HavePrefix("10.")) }) }) Context("When the reserved network belongs to a pool", func() { It("Should not be possible to acquire the same network for a cluster", func() { err := ipam.AcquireReservedSubnet("10.244.0.0/24") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) p, e, err := ipam.GetSubnetsPerCluster("10.244.0.0/24", "192.168.0.0/24", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).ToNot(gomega.Equal("10.0.2.0/24")) - gomega.Expect(e).To(gomega.Equal("192.168.0.0/24")) + Expect(err).To(BeNil()) + Expect(p).ToNot(Equal("10.0.2.0/24")) + Expect(e).To(Equal("192.168.0.0/24")) }) It("Should not be possible to acquire a larger network that contains it for a cluster", func() { err := ipam.AcquireReservedSubnet("10.0.0.0/24") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) p, e, err := ipam.GetSubnetsPerCluster("10.0.0.0/16", "192.168.0.0/24", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).ToNot(gomega.Equal("10.0.0.0/16")) - gomega.Expect(e).To(gomega.Equal("192.168.0.0/24")) + Expect(err).To(BeNil()) + Expect(p).ToNot(Equal("10.0.0.0/16")) + Expect(e).To(Equal("192.168.0.0/24")) }) It("Should not be possible to acquire a smaller network contained by it for a cluster", func() { err := ipam.AcquireReservedSubnet("10.0.2.0/24") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) p, e, err := ipam.GetSubnetsPerCluster("10.0.2.0/25", "192.168.0.0/24", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).ToNot(gomega.Equal("10.0.2.0/25")) - gomega.Expect(e).To(gomega.Equal("192.168.0.0/24")) + Expect(err).To(BeNil()) + Expect(p).ToNot(Equal("10.0.2.0/25")) + Expect(e).To(Equal("192.168.0.0/24")) }) }) }) @@ -111,24 +121,24 @@ var _ = Describe("Ipam", func() { Context("and the subnets have not already been assigned to any other cluster", func() { It("Should allocate the subnets without mapping", func() { p, e, err := ipam.GetSubnetsPerCluster("11.0.0.0/16", "11.1.0.0/16", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).To(gomega.Equal("11.0.0.0/16")) - gomega.Expect(e).To(gomega.Equal("11.1.0.0/16")) + Expect(err).To(BeNil()) + Expect(p).To(Equal("11.0.0.0/16")) + Expect(e).To(Equal("11.1.0.0/16")) }) }) Context("and the subnets have already been assigned to another cluster", func() { Context("and there are available networks with the same mask length in one pool", func() { It("should map the requested networks", func() { p, e, err := ipam.GetSubnetsPerCluster("11.0.0.0/16", "11.1.0.0/16", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).To(gomega.Equal("11.0.0.0/16")) - gomega.Expect(e).To(gomega.Equal("11.1.0.0/16")) + Expect(err).To(BeNil()) + Expect(p).To(Equal("11.0.0.0/16")) + Expect(e).To(Equal("11.1.0.0/16")) p, e, err = ipam.GetSubnetsPerCluster("11.0.0.0/16", "11.1.0.0/16", "cluster2") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).ToNot(gomega.HavePrefix("11.")) - gomega.Expect(p).To(gomega.HaveSuffix("/16")) - gomega.Expect(e).ToNot(gomega.HavePrefix("11.")) - gomega.Expect(e).To(gomega.HaveSuffix("/16")) + Expect(err).To(BeNil()) + Expect(p).ToNot(HavePrefix("11.")) + Expect(p).To(HaveSuffix("/16")) + Expect(e).ToNot(HavePrefix("11.")) + Expect(e).To(HaveSuffix("/16")) }) }) }) @@ -137,8 +147,8 @@ var _ = Describe("Ipam", func() { Context("and remaining network pools are not filled", func() { It("should map it to another network", func() { p, _, err := ipam.GetSubnetsPerCluster("172.16.0.0/12", "10.0.0.0/24", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).ToNot(gomega.Equal("172.16.0.0/12")) + Expect(err).To(BeNil()) + Expect(p).ToNot(Equal("172.16.0.0/12")) }) }) Context("and remaining network pools are filled", func() { @@ -146,21 +156,21 @@ var _ = Describe("Ipam", func() { It("should not allocate any network", func() { // Fill pool #1 err := fillNetworkPool(liqonet.Pools[0], ipam) - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // Fill pool #2 err = fillNetworkPool(liqonet.Pools[1], ipam) - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // Acquire a portion of the network pool p, e, err := ipam.GetSubnetsPerCluster("172.16.0.0/24", "172.16.1.0/24", "cluster5") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).To(gomega.Equal("172.16.0.0/24")) - gomega.Expect(e).To(gomega.Equal("172.16.1.0/24")) + Expect(err).To(BeNil()) + Expect(p).To(Equal("172.16.0.0/24")) + Expect(e).To(Equal("172.16.1.0/24")) // Acquire network pool _, _, err = ipam.GetSubnetsPerCluster("172.16.0.0/12", "10.0.0.0/24", "cluster6") - gomega.Expect(err).ToNot(gomega.BeNil()) + Expect(err).ToNot(BeNil()) }) }) }) @@ -170,72 +180,72 @@ var _ = Describe("Ipam", func() { It("should not allocate the network (externalCidr not available: podCidr requested should be available after the call)", func() { // Fill pool #2 err := fillNetworkPool(liqonet.Pools[1], ipam) - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // Fill pool #3 err = fillNetworkPool(liqonet.Pools[2], ipam) - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // Fill 1st half of pool #1 err = ipam.AcquireReservedSubnet("10.0.0.0/9") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // Cluster network request _, _, err = ipam.GetSubnetsPerCluster("10.128.0.0/9", "192.168.1.0/24", "cluster7") - gomega.Expect(err).ToNot(gomega.BeNil()) + Expect(err).ToNot(BeNil()) // Check if requested podCidr is available err = ipam.AcquireReservedSubnet("10.128.0.0/9") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) }) It("should not allocate the network (both)", func() { // Fill pool #1 err := fillNetworkPool(liqonet.Pools[0], ipam) - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // Fill pool #2 err = fillNetworkPool(liqonet.Pools[1], ipam) - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // Fill pool #3 err = fillNetworkPool(liqonet.Pools[2], ipam) - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // Cluster network request _, _, err = ipam.GetSubnetsPerCluster("10.1.0.0/16", "10.0.0.0/24", "cluster7") - gomega.Expect(err).ToNot(gomega.BeNil()) + Expect(err).ToNot(BeNil()) }) }) Context("and the subnet has not already been assigned to any other cluster", func() { It("Should allocate the subnet itself, without mapping", func() { p, e, err := ipam.GetSubnetsPerCluster("10.0.0.0/16", "10.1.0.0/16", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).To(gomega.Equal("10.0.0.0/16")) - gomega.Expect(e).To(gomega.Equal("10.1.0.0/16")) + Expect(err).To(BeNil()) + Expect(p).To(Equal("10.0.0.0/16")) + Expect(e).To(Equal("10.1.0.0/16")) }) }) Context("and the subnet has already been assigned to another cluster", func() { Context("and there is an available network with the same mask length in one pool", func() { It("should map the requested network to another network taken by the pool", func() { p, e, err := ipam.GetSubnetsPerCluster("10.0.0.0/16", "10.1.0.0/16", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).To(gomega.Equal("10.0.0.0/16")) - gomega.Expect(e).To(gomega.Equal("10.1.0.0/16")) + Expect(err).To(BeNil()) + Expect(p).To(Equal("10.0.0.0/16")) + Expect(e).To(Equal("10.1.0.0/16")) p, e, err = ipam.GetSubnetsPerCluster("10.0.0.0/16", "10.1.0.0/16", "clustere") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).ToNot(gomega.Equal("10.0.0.0/16")) - gomega.Expect(e).ToNot(gomega.Equal("10.1.0.0/16")) + Expect(err).To(BeNil()) + Expect(p).ToNot(Equal("10.0.0.0/16")) + Expect(e).ToNot(Equal("10.1.0.0/16")) }) }) Context("and there is not an available network with the same mask length in any pool", func() { It("should fail to allocate the network", func() { p, _, err := ipam.GetSubnetsPerCluster("10.0.0.0/9", "10.1.0.0/16", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).To(gomega.Equal("10.0.0.0/9")) + Expect(err).To(BeNil()) + Expect(p).To(Equal("10.0.0.0/9")) _, _, err = ipam.GetSubnetsPerCluster("10.0.0.0/9", "10.3.0.0/16", "cluster2") - gomega.Expect(err).ToNot(gomega.BeNil()) + Expect(err).ToNot(BeNil()) }) }) }) @@ -245,21 +255,21 @@ var _ = Describe("Ipam", func() { Context("Freeing cluster networks that exist", func() { It("Should successfully free the subnets", func() { p, e, err := ipam.GetSubnetsPerCluster("10.0.1.0/24", "10.0.2.0/24", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).To(gomega.Equal("10.0.1.0/24")) - gomega.Expect(e).To(gomega.Equal("10.0.2.0/24")) + Expect(err).To(BeNil()) + Expect(p).To(Equal("10.0.1.0/24")) + Expect(e).To(Equal("10.0.2.0/24")) err = ipam.FreeSubnetsPerCluster("cluster1") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) p, e, err = ipam.GetSubnetsPerCluster("10.0.1.0/24", "10.0.2.0/24", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).To(gomega.Equal("10.0.1.0/24")) - gomega.Expect(e).To(gomega.Equal("10.0.2.0/24")) + Expect(err).To(BeNil()) + Expect(p).To(Equal("10.0.1.0/24")) + Expect(e).To(Equal("10.0.2.0/24")) }) }) Context("Freeing a cluster network that does not exists", func() { It("Should return no errors", func() { err := ipam.FreeSubnetsPerCluster("cluster1") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) }) }) }) @@ -267,29 +277,29 @@ var _ = Describe("Ipam", func() { Context("Freeing a network that has been reserved previously", func() { It("Should successfully free the subnet", func() { err := ipam.AcquireReservedSubnet("10.0.1.0/24") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) err = ipam.FreeReservedSubnet("10.0.1.0/24") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) err = ipam.AcquireReservedSubnet("10.0.1.0/24") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) }) }) Context("Freeing a cluster network that does not exists", func() { It("Should return no errors", func() { err := ipam.FreeReservedSubnet("10.0.1.0/24") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) }) }) Context("Freeing a reserved subnet equal to a network pool", func() { It("Should make available the network pool", func() { err := ipam.AcquireReservedSubnet("10.0.0.0/8") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) err = ipam.FreeReservedSubnet("10.0.0.0/8") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) p, e, err := ipam.GetSubnetsPerCluster("10.0.0.0/16", "10.2.0.0/16", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).To(gomega.Equal("10.0.0.0/16")) - gomega.Expect(e).To(gomega.Equal("10.2.0.0/16")) + Expect(err).To(BeNil()) + Expect(p).To(Equal("10.0.0.0/16")) + Expect(e).To(Equal("10.2.0.0/16")) }) }) }) @@ -297,35 +307,36 @@ var _ = Describe("Ipam", func() { It("ipam should retrieve configuration by resource", func() { // Assign networks to cluster p, e, err := ipam.GetSubnetsPerCluster("10.0.1.0/24", "10.0.2.0/24", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).To(gomega.Equal("10.0.1.0/24")) - gomega.Expect(e).To(gomega.Equal("10.0.2.0/24")) + Expect(err).To(BeNil()) + Expect(p).To(Equal("10.0.1.0/24")) + Expect(e).To(Equal("10.0.2.0/24")) // Simulate re-scheduling + ipam.StopGRPCServer() ipam = liqonet.NewIPAM() - err = ipam.Init(liqonet.Pools, dynClient) - gomega.Expect(err).To(gomega.BeNil()) + err = ipam.Init(liqonet.Pools, dynClient, 2000+rand.Intn(2000)) + Expect(err).To(BeNil()) // Another cluster asks for the same networks p, e, err = ipam.GetSubnetsPerCluster("10.0.1.0/24", "10.0.2.0/24", "cluster2") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).ToNot(gomega.Equal("10.0.1.0/24")) - gomega.Expect(e).ToNot(gomega.Equal("10.0.2.0/24")) + Expect(err).To(BeNil()) + Expect(p).ToNot(Equal("10.0.1.0/24")) + Expect(e).ToNot(Equal("10.0.2.0/24")) }) }) Describe("AddNetworkPool", func() { Context("Trying to add a default network pool", func() { It("Should generate an error", func() { err := ipam.AddNetworkPool("10.0.0.0/8") - gomega.Expect(err).ToNot(gomega.BeNil()) + Expect(err).ToNot(BeNil()) }) }) Context("Trying to add twice the same network pool", func() { It("Should generate an error", func() { err := ipam.AddNetworkPool("11.0.0.0/8") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) err = ipam.AddNetworkPool("11.0.0.0/8") - gomega.Expect(err).ToNot(gomega.BeNil()) + Expect(err).ToNot(BeNil()) }) }) Context("After adding a new network pool", func() { @@ -333,36 +344,36 @@ var _ = Describe("Ipam", func() { // Reserve default network pools for _, network := range liqonet.Pools { err := fillNetworkPool(network, ipam) - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) } // Add new network pool err := ipam.AddNetworkPool("11.0.0.0/8") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // Reserve a given network err = ipam.AcquireReservedSubnet("12.0.0.0/24") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // Reserve a given network err = ipam.AcquireReservedSubnet("12.0.1.0/24") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // IPAM should use 11.0.0.0/8 to map the cluster network p, e, err := ipam.GetSubnetsPerCluster("12.0.0.0/24", "12.0.1.0/24", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).To(gomega.HavePrefix("11")) - gomega.Expect(p).To(gomega.HaveSuffix("/24")) - gomega.Expect(e).To(gomega.HavePrefix("11")) - gomega.Expect(e).To(gomega.HaveSuffix("/24")) + Expect(err).To(BeNil()) + Expect(p).To(HavePrefix("11")) + Expect(p).To(HaveSuffix("/24")) + Expect(e).To(HavePrefix("11")) + Expect(e).To(HaveSuffix("/24")) }) }) Context("Trying to add a network pool that overlaps with a reserved network", func() { It("Should generate an error", func() { err := ipam.AcquireReservedSubnet("11.0.0.0/8") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) err = ipam.AddNetworkPool("11.0.0.0/16") - gomega.Expect(err).ToNot(gomega.BeNil()) + Expect(err).ToNot(BeNil()) }) }) }) @@ -370,7 +381,7 @@ var _ = Describe("Ipam", func() { Context("Remove a network pool that does not exist", func() { It("Should return an error", func() { err := ipam.RemoveNetworkPool("11.0.0.0/8") - gomega.Expect(err).ToNot(gomega.BeNil()) + Expect(err).ToNot(BeNil()) }) }) Context("Remove a network pool that exists", func() { @@ -378,34 +389,34 @@ var _ = Describe("Ipam", func() { // Reserve default network pools for _, network := range liqonet.Pools { err := ipam.AcquireReservedSubnet(network) - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) } // Add new network pool err := ipam.AddNetworkPool("11.0.0.0/8") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // Remove network pool err = ipam.RemoveNetworkPool("11.0.0.0/8") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // Reserve a given network err = ipam.AcquireReservedSubnet("12.0.0.0/24") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // Reserve a given network err = ipam.AcquireReservedSubnet("12.0.1.0/24") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // Should fail to assign a network to cluster _, _, err = ipam.GetSubnetsPerCluster("12.0.0.0/24", "12.0.1.0/24", "cluster1") - gomega.Expect(err).ToNot(gomega.BeNil()) + Expect(err).ToNot(BeNil()) }) }) Context("Remove a network pool that is a default one", func() { It("Should generate an error", func() { err := ipam.RemoveNetworkPool("10.0.0.0/8") - gomega.Expect(err).ToNot(gomega.BeNil()) + Expect(err).ToNot(BeNil()) }) }) Context("Remove a network pool that is used for a cluster", func() { @@ -413,64 +424,497 @@ var _ = Describe("Ipam", func() { // Reserve default network pools for _, network := range liqonet.Pools { err := ipam.AcquireReservedSubnet(network) - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) } // Add new network pool err := ipam.AddNetworkPool("11.0.0.0/8") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // Reserve a network err = ipam.AcquireReservedSubnet("12.0.0.0/24") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // Reserve a network err = ipam.AcquireReservedSubnet("12.0.1.0/24") - gomega.Expect(err).To(gomega.BeNil()) + Expect(err).To(BeNil()) // IPAM should use 11.0.0.0/8 to map the cluster network p, e, err := ipam.GetSubnetsPerCluster("12.0.0.0/24", "12.0.1.0/24", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) - gomega.Expect(p).To(gomega.HavePrefix("11")) - gomega.Expect(p).To(gomega.HaveSuffix("/24")) - gomega.Expect(e).To(gomega.HavePrefix("11")) - gomega.Expect(e).To(gomega.HaveSuffix("/24")) + Expect(err).To(BeNil()) + Expect(p).To(HavePrefix("11")) + Expect(p).To(HaveSuffix("/24")) + Expect(e).To(HavePrefix("11")) + Expect(e).To(HaveSuffix("/24")) err = ipam.RemoveNetworkPool("11.0.0.0/8") - gomega.Expect(err).ToNot(gomega.BeNil()) + Expect(err).ToNot(BeNil()) + }) + }) + }) + + Describe("AddLocalSubnetsPerCluster", func() { + Context("If the networks do not exist yet", func() { + It("should return no errors", func() { + err := ipam.AddLocalSubnetsPerCluster("10.0.0.0/24", "192.168.0.0/24", "cluster1") + Expect(err).To(BeNil()) + }) + }) + Context("If the networks already exist", func() { + It("should return no errors", func() { + err := ipam.AddLocalSubnetsPerCluster("10.0.0.0/24", "192.168.0.0/24", "cluster1") + Expect(err).To(BeNil()) + err = ipam.AddLocalSubnetsPerCluster("10.0.0.0/24", "192.168.0.0/24", "cluster1") + Expect(err).To(BeNil()) + }) + }) + }) + + Describe("RemoveLocalSubnetsPerCluster", func() { + Context("If the networks do not exist", func() { + It("should return no errors", func() { + err := ipam.RemoveLocalSubnetsPerCluster("cluster1") + Expect(err).To(BeNil()) + }) + }) + Context("If the networks exist", func() { + It("should return no errors", func() { + err := ipam.AddLocalSubnetsPerCluster("10.0.0.0/24", "192.168.0.0/24", "cluster1") + Expect(err).To(BeNil()) + err = ipam.RemoveLocalSubnetsPerCluster("cluster1") + Expect(err).To(BeNil()) }) }) }) - Describe("AddExternalCIDRPerCluster", func() { - Context("If the remote externalCIDR does not exist yet", func() { + + Describe("GetExternalCIDR", func() { + Context("Invoking it twice", func() { It("should return no errors", func() { - err := ipam.AddExternalCIDRPerCluster("10.0.0.0/24", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) + e, err := ipam.GetExternalCIDR(24) + Expect(err).To(BeNil()) + Expect(e).To(HaveSuffix("/24")) + _, err = ipam.GetExternalCIDR(24) + Expect(err).To(BeNil()) }) }) - Context("If the remote ExternalCIDR already exists", func() { + Context("Using a valid mask length", func() { It("should return no errors", func() { - err := ipam.AddExternalCIDRPerCluster("10.0.0.0/24", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) - err = ipam.AddExternalCIDRPerCluster("10.0.0.0/24", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) + e, err := ipam.GetExternalCIDR(24) + Expect(err).To(BeNil()) + Expect(e).To(HaveSuffix("/24")) + }) + }) + Context("Using an invalid mask length", func() { + It("should return an error", func() { + _, err := ipam.GetExternalCIDR(33) + Expect(err).ToNot(BeNil()) }) }) }) - Describe("RemoveExternalCIDRPerCluster", func() { - Context("If the remote externalCIDR does not exist", func() { + Describe("SetPodCIDR", func() { + Context("Invoking func for the first time", func() { + It("should return no errors", func() { + err := ipam.SetPodCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) + }) + }) + Context("Later invocation with the same PodCIDR", func() { + It("should return no errors", func() { + err := ipam.SetPodCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) + err = ipam.SetPodCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) + }) + }) + Context("Later invocation with a different PodCIDR", func() { + It("should return no errors", func() { + err := ipam.SetPodCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) + err = ipam.SetPodCIDR("10.0.1.0/24") + Expect(err).ToNot(BeNil()) + }) + }) + Context("Using a reserved network", func() { + It("should return an error", func() { + err := ipam.AcquireReservedSubnet("10.0.1.0/24") + Expect(err).To(BeNil()) + err = ipam.SetPodCIDR("10.0.1.0/24") + Expect(err).ToNot(BeNil()) + }) + }) + }) + Describe("SetServiceCIDR", func() { + Context("Invoking func for the first time", func() { + It("should return no errors", func() { + err := ipam.SetServiceCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) + }) + }) + Context("Later invocation with the same ServiceCIDR", func() { It("should return no errors", func() { - err := ipam.RemoveExternalCIDRPerCluster("cluster1") - gomega.Expect(err).To(gomega.BeNil()) + err := ipam.SetServiceCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) + err = ipam.SetServiceCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) }) }) - Context("If the remote ExternalCIDR already exists", func() { + Context("Later invocation with a different ServiceCIDR", func() { It("should return no errors", func() { - err := ipam.AddExternalCIDRPerCluster("10.0.0.0/24", "cluster1") - gomega.Expect(err).To(gomega.BeNil()) - err = ipam.RemoveExternalCIDRPerCluster("cluster1") - gomega.Expect(err).To(gomega.BeNil()) + err := ipam.SetServiceCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) + err = ipam.SetServiceCIDR("10.0.1.0/24") + Expect(err).ToNot(BeNil()) + }) + }) + Context("Using a reserved network", func() { + It("should return an error", func() { + err := ipam.AcquireReservedSubnet("10.0.1.0/24") + Expect(err).To(BeNil()) + err = ipam.SetServiceCIDR("10.0.1.0/24") + Expect(err).ToNot(BeNil()) + }) + }) + }) + Describe("MapEndpointIP", func() { + Context("If the endpoint IP belongs to local PodCIDR", func() { + Context("and the remote cluster has not remapped the local PodCIDR", func() { + It("should return the same IP address", func() { + // Set PodCIDR + err := ipam.SetPodCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) + + // Remote cluster has not remapped local PodCIDR + err = ipam.AddLocalSubnetsPerCluster("None", "None", "cluster1") + Expect(err).To(BeNil()) + + response, err := ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: "cluster1", + Ip: "10.0.0.1", + }) + Expect(err).To(BeNil()) + Expect(response.GetIp()).To(Equal("10.0.0.1")) + }) + }) + Context("and the remote cluster has remapped the local PodCIDR", func() { + It("should map the endpoint IP using the remapped PodCIDR", func() { + // Set PodCIDR + err := ipam.SetPodCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) + + // Remote cluster has remapped local PodCIDR + err = ipam.AddLocalSubnetsPerCluster("192.168.0.0/24", "None", "cluster1") + Expect(err).To(BeNil()) + + response, err := ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: "cluster1", + Ip: "10.0.0.1", + }) + Expect(err).To(BeNil()) + Expect(response.GetIp()).To(Equal("192.168.0.1")) + }) + }) + }) + Context("If the endpoint IP does not belong to local PodCIDR", func() { + Context("and the remote cluster has not remapped the local ExternalCIDR", func() { + It("should map the endpoint IP to a new IP belonging to local ExternalCIDR", func() { + // Set PodCIDR + err := ipam.SetPodCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) + + // Set ExternalCIDR + externalCIDR, err := ipam.GetExternalCIDR(24) + Expect(err).To(BeNil()) + Expect(externalCIDR).To(HaveSuffix("/24")) + + // Remote cluster has not remapped local ExternalCIDR + err = ipam.AddLocalSubnetsPerCluster("None", "None", "cluster1") + Expect(err).To(BeNil()) + + response, err := ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: "cluster1", + Ip: "20.0.0.1", + }) + Expect(err).To(BeNil()) + slicedPrefix := strings.SplitN(externalCIDR, ".", 4) + slicedPrefix = slicedPrefix[:len(slicedPrefix)-1] + Expect(response.GetIp()).To(HavePrefix(strings.Join(slicedPrefix, "."))) + }) + It("should return the same IP if more remote clusters ask for the same endpoint", func() { + // Set PodCIDR + err := ipam.SetPodCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) + + // Set ExternalCIDR + externalCIDR, err := ipam.GetExternalCIDR(24) + Expect(err).To(BeNil()) + Expect(externalCIDR).To(HaveSuffix("/24")) + + err = ipam.AddLocalSubnetsPerCluster("None", "None", "cluster1") + Expect(err).To(BeNil()) + err = ipam.AddLocalSubnetsPerCluster("None", "None", "cluster2") + Expect(err).To(BeNil()) + + // Reflection cluster1 + response, err := ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: "cluster1", + Ip: "20.0.0.1", + }) + Expect(err).To(BeNil()) + slicedPrefix := strings.SplitN(externalCIDR, ".", 4) + slicedPrefix = slicedPrefix[:len(slicedPrefix)-1] + Expect(response.GetIp()).To(HavePrefix(strings.Join(slicedPrefix, "."))) + expectedIp := response.GetIp() + + // Reflection cluster2 + response, err = ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: "cluster1", + Ip: "20.0.0.1", + }) + Expect(err).To(BeNil()) + Expect(response.GetIp()).To(Equal(expectedIp)) + }) + }) + Context("and the remote cluster has remapped the local ExternalCIDR", func() { + It("should map the endpoint IP to a new IP belonging to the remapped ExternalCIDR", func() { + // Set PodCIDR + err := ipam.SetPodCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) + + // Set ExternalCIDR + e, err := ipam.GetExternalCIDR(24) + Expect(err).To(BeNil()) + Expect(e).To(HaveSuffix("/24")) + + // Remote cluster has remapped local ExternalCIDR + err = ipam.AddLocalSubnetsPerCluster("None", "192.168.0.0/24", "cluster1") + Expect(err).To(BeNil()) + + response, err := ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: "cluster1", + Ip: "20.0.0.1", + }) + Expect(err).To(BeNil()) + Expect(response.GetIp()).To(HavePrefix("192.168.0.")) + }) + }) + Context("and the ExternalCIDR has not any more available IPs", func() { + It("should return an error", func() { + var response *liqonet.MapResponse + var err error + // Set PodCIDR + err = ipam.SetPodCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) + + // Set ExternalCIDR + externalCIDR, err := ipam.GetExternalCIDR(24) + Expect(err).To(BeNil()) + Expect(externalCIDR).To(HaveSuffix("/24")) + slicedPrefix := strings.SplitN(externalCIDR, ".", 4) + slicedPrefix = slicedPrefix[:len(slicedPrefix)-1] + + // Remote cluster has not remapped local ExternalCIDR + err = ipam.AddLocalSubnetsPerCluster("None", "None", "cluster1") + Expect(err).To(BeNil()) + + // Fill up ExternalCIDR + for i := 0; i < 254; i++ { + response, err = ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: "cluster1", + Ip: fmt.Sprintf("20.0.0.%d", i), + }) + Expect(err).To(BeNil()) + Expect(response.GetIp()).To(HavePrefix(strings.Join(slicedPrefix, "."))) + } + + _, err = ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: "cluster1", + Ip: "3.100.0.9", + }) + Expect(err).ToNot(BeNil()) + }) + }) + Context("Using an invalid endpointIP", func() { + It("should return an error", func() { + _, err := ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: "cluster1", + Ip: "30.0.9", + }) + Expect(err).ToNot(BeNil()) + }) + }) + Context("If the local PodCIDR is not set", func() { + It("should return an error", func() { + _, err := ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: "cluster1", + Ip: "30.0.4.9", + }) + Expect(err.Error()).To(ContainSubstring("cluster PodCIDR is not set")) + }) + }) + Context("If the remote cluster has not a PodCIDR set", func() { + It("should return an error", func() { + // Set PodCIDR + err := ipam.SetPodCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) + + _, err = ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: "cluster1", + Ip: "10.0.0.9", + }) + Expect(err.Error()).To(ContainSubstring("remote cluster cluster1 has not a local NAT PodCIDR")) + }) + }) + Context("If the remote cluster has not an ExternalCIDR set", func() { + It("should return an error", func() { + // Set PodCIDR + err := ipam.SetPodCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) + + _, err = ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: "cluster1", + Ip: "30.0.4.9", + }) + Expect(err.Error()).To(ContainSubstring("remote cluster cluster1 has not a remote ExternalCIDR")) + }) + }) + }) + }) + Describe("UnmapEndpointIP", func() { + Context("If there are no more clusters using an endpointIP", func() { + It("should free the relative IP", func() { + // Set PodCIDR + err := ipam.SetPodCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) + + // Set ExternalCIDR + externalCIDR, err := ipam.GetExternalCIDR(24) + Expect(err).To(BeNil()) + Expect(externalCIDR).To(HaveSuffix("/24")) + slicedPrefix := strings.SplitN(externalCIDR, ".", 4) + slicedPrefix = slicedPrefix[:len(slicedPrefix)-1] + + err = ipam.AddLocalSubnetsPerCluster("None", "None", "cluster1") + Expect(err).To(BeNil()) + err = ipam.AddLocalSubnetsPerCluster("None", "None", "cluster2") + Expect(err).To(BeNil()) + + // Reflection in cluster1 + response, err := ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: "cluster1", + Ip: "20.0.0.1", + }) + Expect(err).To(BeNil()) + Expect(response.GetIp()).To(HavePrefix(strings.Join(slicedPrefix, "."))) + ip := response.GetIp() + + // Reflection in cluster2 + _, err = ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: "cluster2", + Ip: "20.0.0.1", + }) + Expect(err).To(BeNil()) + + // Terminate reflection in cluster1 + _, err = ipam.UnmapEndpointIP(context.Background(), &liqonet.UnmapRequest{ + ClusterID: "cluster1", + Ip: "20.0.0.1", + }) + Expect(err).To(BeNil()) + + // Terminate reflection in cluster2 + _, err = ipam.UnmapEndpointIP(context.Background(), &liqonet.UnmapRequest{ + ClusterID: "cluster2", + Ip: "20.0.0.1", + }) + Expect(err).To(BeNil()) + + /* In order to check if the IP has been freed, simulate further reflections + till the ExternalCIDR has no more IPs and check if the returned IP is equal to + the freed IP. + An alternative could be to overwrite the stdout and check + existence of the log "IP has been freed".*/ + var found bool + for i := 0; i < 254; i++ { + err = ipam.AddLocalSubnetsPerCluster("None", "None", fmt.Sprintf("c%d", i)) + Expect(err).To(BeNil()) + r, err := ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: fmt.Sprintf("c%d", i), + Ip: fmt.Sprintf("30.0.0.%d", i), + }) + Expect(err).To(BeNil()) + if r.GetIp() == ip { + found = true + break + } + } + if !found { + Fail("ip has not been freed") + } + }) + }) + Context("If there are other clusters using an endpointIP", func() { + It("should not free the relative IP", func() { + // Set PodCIDR + err := ipam.SetPodCIDR("10.0.0.0/24") + Expect(err).To(BeNil()) + + // Set ExternalCIDR + externalCIDR, err := ipam.GetExternalCIDR(24) + Expect(err).To(BeNil()) + Expect(externalCIDR).To(HaveSuffix("/24")) + slicedPrefix := strings.SplitN(externalCIDR, ".", 4) + slicedPrefix = slicedPrefix[:len(slicedPrefix)-1] + + err = ipam.AddLocalSubnetsPerCluster("None", "None", "cluster1") + Expect(err).To(BeNil()) + err = ipam.AddLocalSubnetsPerCluster("None", "None", "cluster2") + Expect(err).To(BeNil()) + + // Reflection in cluster1 + response, err := ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: "cluster1", + Ip: "20.0.0.1", + }) + Expect(err).To(BeNil()) + Expect(response.GetIp()).To(HavePrefix(strings.Join(slicedPrefix, "."))) + ip := response.GetIp() + + // Reflection in cluster2 + _, err = ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: "cluster2", + Ip: "20.0.0.1", + }) + Expect(err).To(BeNil()) + + // Terminate reflection in cluster2 + _, err = ipam.UnmapEndpointIP(context.Background(), &liqonet.UnmapRequest{ + ClusterID: "cluster2", + Ip: "20.0.0.1", + }) + Expect(err).To(BeNil()) + + /* In order to check if the IP has been freed, simulate further reflections + till the ExternalCIDR has no more IPs and check if the returned IP is equal to + the freed IP. + An alternative could be to overwrite the stdout and check + existence of the log "IP has been freed".*/ + var found bool + for i := 0; i < 254; i++ { + err = ipam.AddLocalSubnetsPerCluster("None", "None", fmt.Sprintf("c%d", i)) + Expect(err).To(BeNil()) + r, _ := ipam.MapEndpointIP(context.Background(), &liqonet.MapRequest{ + ClusterID: fmt.Sprintf("c%d", i), + Ip: fmt.Sprintf("30.0.0.%d", i), + }) + if r != nil && r.GetIp() == ip { + found = true + break + } + } + if found { + Fail("ip has been freed") + } }) }) }) diff --git a/pkg/liqonet/test/doc.go b/pkg/liqonet/test/doc.go new file mode 100644 index 0000000000..d0910bb537 --- /dev/null +++ b/pkg/liqonet/test/doc.go @@ -0,0 +1,2 @@ +// Package test provides a mock type for IPAM module +package test diff --git a/pkg/liqonet/test/ipam_mock.go b/pkg/liqonet/test/ipam_mock.go new file mode 100644 index 0000000000..ef1710dd75 --- /dev/null +++ b/pkg/liqonet/test/ipam_mock.go @@ -0,0 +1,35 @@ +package test + +import ( + "context" + + grpc "google.golang.org/grpc" + + "github.com/liqotech/liqo/pkg/liqonet" +) + +// MockIpam mocks the IPAM module. +type MockIpam struct { + RemappedPodCIDR string +} + +// MapEndpointIP mocks the corresponding func in IPAM. +func (mock *MockIpam) MapEndpointIP( + ctx context.Context, + in *liqonet.MapRequest, + opts ...grpc.CallOption) (*liqonet.MapResponse, error) { + oldIP := in.GetIp() + newIP, err := liqonet.MapIPToNetwork(mock.RemappedPodCIDR, oldIP) + if err != nil { + return &liqonet.MapResponse{}, err + } + return &liqonet.MapResponse{Ip: newIP}, nil +} + +// UnmapEndpointIP mocks the corresponding func in IPAM. +func (mock *MockIpam) UnmapEndpointIP( + ctx context.Context, + in *liqonet.UnmapRequest, + opts ...grpc.CallOption) (*liqonet.UnmapResponse, error) { + return &liqonet.UnmapResponse{}, nil +} diff --git a/pkg/liqonet/utils.go b/pkg/liqonet/utils.go index 3bb655049f..f6316f029f 100644 --- a/pkg/liqonet/utils.go +++ b/pkg/liqonet/utils.go @@ -20,6 +20,7 @@ import ( "k8s.io/klog" "github.com/liqotech/liqo/internal/utils/errdefs" + liqoconst "github.com/liqotech/liqo/pkg/consts" ) var ( @@ -27,7 +28,38 @@ var ( ShutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM, syscall.SIGKILL} ) -// GetPodIP gets the ip of the pod passed as an environment variable. +// MapIPToNetwork creates a new IP address obtained by means of the old IP address and the new network. +func MapIPToNetwork(newNetwork, oldIP string) (newIP string, err error) { + if newNetwork == liqoconst.DefaultCIDRValue { + return oldIP, nil + } + // Parse newNetwork + ip, network, err := net.ParseCIDR(newNetwork) + if err != nil { + return "", err + } + // Get mask + mask := network.Mask + // Get slice of bytes for newNetwork + // Type net.IP has underlying type []byte + parsedNewIP := ip.To4() + // Get oldIP as slice of bytes + parsedOldIP := net.ParseIP(oldIP) + if parsedOldIP == nil { + return "", fmt.Errorf("cannot parse oldIP") + } + parsedOldIP = parsedOldIP.To4() + // Substitute the last 32-mask bits of newNetwork with bits taken by the old ip + for i := 0; i < len(mask); i++ { + // Step 1: NOT(mask[i]) = mask[i] ^ 0xff. They are the 'host' bits + // Step 2: BITWISE AND between the host bits and parsedOldIP[i] zeroes the network bits in parsedOldIP[i] + // Step 3: BITWISE OR copies the result of step 2 in newIP[i] + parsedNewIP[i] |= (mask[i] ^ 0xff) & parsedOldIP[i] + } + newIP = parsedNewIP.String() + return +} + func GetPodIP() (net.IP, error) { ipAddress, isSet := os.LookupEnv("POD_IP") if !isSet { @@ -140,14 +172,14 @@ func EnableIPForwarding() error { return nil } -// GetMask helper function to get a mask from a CIDR. +// GetMask retrieves the mask from a CIDR. func GetMask(network string) uint8 { _, net, _ := net.ParseCIDR(network) ones, _ := net.Mask.Size() return uint8(ones) } -// SetMask helper function that forges a new cidr from a network cidr and a mask. +// SetMask forges a new cidr from a network cidr and a mask. func SetMask(network string, mask uint8) (string, error) { _, n, err := net.ParseCIDR(network) if err != nil { @@ -207,7 +239,7 @@ func DeleteIFaceByIndex(ifaceIndex int) error { return err } -// IsValidCIDR return en error if the subnet passed in not in the CIDR notation. +// IsValidCIDR returns an error if the received CIDR is invalid. func IsValidCIDR(cidr string) error { _, _, err := net.ParseCIDR(cidr) return err diff --git a/pkg/liqonet/utils_test.go b/pkg/liqonet/utils_test.go index 61ea455f23..55de790eab 100644 --- a/pkg/liqonet/utils_test.go +++ b/pkg/liqonet/utils_test.go @@ -5,6 +5,8 @@ import ( "time" . "github.com/onsi/ginkgo" + . "github.com/onsi/ginkgo/extensions/table" + "github.com/onsi/gomega" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -15,6 +17,25 @@ import ( ) var _ = Describe("Liqonet", func() { + DescribeTable("MapIPToNetwork", + func(oldIp, newPodCidr, expectedIP string, expectedErr string) { + ip, err := liqonet.MapIPToNetwork(oldIp, newPodCidr) + if expectedErr != "" { + gomega.Expect(err.Error()).To(gomega.Equal(expectedErr)) + } else { + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + } + gomega.Expect(ip).To(gomega.Equal(expectedIP)) + }, + Entry("Mapping 10.2.1.3 to 10.0.4.0/24", "10.0.4.0/24", "10.2.1.3", "10.0.4.3", ""), + Entry("Mapping 10.2.1.128 to 10.0.4.0/24", "10.0.4.0/24", "10.2.1.128", "10.0.4.128", ""), + Entry("Mapping 10.2.1.1 to 10.0.4.0/24", "10.0.4.0/24", "10.2.1.1", "10.0.4.1", ""), + Entry("Mapping 10.2.127.128 to 10.0.128.0/23", "10.0.128.0/23", "10.2.127.128", "10.0.129.128", ""), + Entry("Mapping 10.2.128.128 to 10.0.126.0/23", "10.0.127.0/23", "10.2.128.128", "10.0.127.128", ""), + Entry("Mapping 10.2.128.128 to 10.0.126.0/25", "10.0.126.0/25", "10.2.128.128", "10.0.126.0", ""), + Entry("Using an invalid newPodCidr", "10.0..0/25", "10.2.128.128", "", "invalid CIDR address: 10.0..0/25"), + Entry("Using an invalid oldIp", "10.0.0.0/25", "10.2...128", "", "cannot parse oldIP"), + ) Describe("Getting ClusterID info from ConfigMap", func() { diff --git a/pkg/virtualKubelet/apiReflection/reflectors/outgoing/apiTypes.go b/pkg/virtualKubelet/apiReflection/reflectors/outgoing/apiTypes.go index 0f014b1403..005b51811b 100644 --- a/pkg/virtualKubelet/apiReflection/reflectors/outgoing/apiTypes.go +++ b/pkg/virtualKubelet/apiReflection/reflectors/outgoing/apiTypes.go @@ -1,6 +1,13 @@ package outgoing import ( + "fmt" + + "google.golang.org/grpc" + "k8s.io/klog" + + liqoconst "github.com/liqotech/liqo/pkg/consts" + "github.com/liqotech/liqo/pkg/liqonet" apimgmt "github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection" ri "github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection/reflectors/reflectorsInterfaces" "github.com/liqotech/liqo/pkg/virtualKubelet/options" @@ -19,10 +26,19 @@ func configmapsReflectorBuilder(reflector ri.APIReflector, _ map[options.OptionK } func endpointslicesReflectorBuilder(reflector ri.APIReflector, opts map[options.OptionKey]options.Option) ri.OutgoingAPIReflector { + conn, err := grpc.Dial(fmt.Sprintf("%s:%d", liqoconst.NetworkManagerServiceName, liqoconst.NetworkManagerIpamPort), + grpc.WithInsecure(), + grpc.WithBlock()) + if err != nil { + klog.Error(err) + } + ipamClient := liqonet.NewIpamClient(conn) + return &EndpointSlicesReflector{ APIReflector: reflector, LocalRemappedPodCIDR: opts[types.LocalRemappedPodCIDR], VirtualNodeName: opts[types.VirtualNodeName], + IpamClient: ipamClient, } } diff --git a/pkg/virtualKubelet/apiReflection/reflectors/outgoing/endpointSlices.go b/pkg/virtualKubelet/apiReflection/reflectors/outgoing/endpointSlices.go index 1d103f5f6f..1f693ea0c7 100644 --- a/pkg/virtualKubelet/apiReflection/reflectors/outgoing/endpointSlices.go +++ b/pkg/virtualKubelet/apiReflection/reflectors/outgoing/endpointSlices.go @@ -2,6 +2,7 @@ package outgoing import ( "context" + "strings" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" @@ -12,10 +13,10 @@ import ( "k8s.io/client-go/util/retry" "k8s.io/klog" + "github.com/liqotech/liqo/pkg/liqonet" apimgmt "github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection" "github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection/reflectors" ri "github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection/reflectors/reflectorsInterfaces" - "github.com/liqotech/liqo/pkg/virtualKubelet/forge" "github.com/liqotech/liqo/pkg/virtualKubelet/options" ) @@ -28,6 +29,7 @@ type EndpointSlicesReflector struct { LocalRemappedPodCIDR options.ReadOnlyOption VirtualNodeName options.ReadOnlyOption + IpamClient liqonet.IpamClient } func (r *EndpointSlicesReflector) SetSpecializedPreProcessingHandlers() { @@ -129,7 +131,7 @@ func (r *EndpointSlicesReflector) PreAdd(obj interface{}) (interface{}, watch.Ev OwnerReferences: svcOwnerRef, }, AddressType: discoveryv1beta1.AddressTypeIPv4, - Endpoints: filterEndpoints(epLocal, string(r.LocalRemappedPodCIDR.Value()), string(r.VirtualNodeName.Value())), + Endpoints: filterEndpoints(epLocal, r.IpamClient, string(r.VirtualNodeName.Value())), Ports: epLocal.Ports, } @@ -158,13 +160,14 @@ func (r *EndpointSlicesReflector) PreUpdate(newObj, _ interface{}) (interface{}, } RemoteEpSlice := oldRemoteObj.(*discoveryv1beta1.EndpointSlice).DeepCopy() - RemoteEpSlice.Endpoints = filterEndpoints(endpointSliceHome, string(r.LocalRemappedPodCIDR.Value()), string(r.VirtualNodeName.Value())) + RemoteEpSlice.Endpoints = filterEndpoints(endpointSliceHome, r.IpamClient, string(r.VirtualNodeName.Value())) RemoteEpSlice.Ports = endpointSliceHome.Ports return RemoteEpSlice, watch.Modified } func (r *EndpointSlicesReflector) PreDelete(obj interface{}) (interface{}, watch.EventType) { + clusterID := strings.TrimPrefix(string(r.VirtualNodeName.Value()), "liqo-") endpointSliceLocal := obj.(*discoveryv1beta1.EndpointSlice) nattedNs, err := r.NattingTable().NatNamespace(endpointSliceLocal.Namespace, false) if err != nil { @@ -173,21 +176,29 @@ func (r *EndpointSlicesReflector) PreDelete(obj interface{}) (interface{}, watch } endpointSliceLocal.Namespace = nattedNs + for _, endpoint := range endpointSliceLocal.Endpoints { + _, err := r.IpamClient.UnmapEndpointIP(context.Background(), &liqonet.UnmapRequest{ClusterID: clusterID, Ip: endpoint.Addresses[0]}) + if err != nil { + klog.Error(err) + } + } + return endpointSliceLocal, watch.Deleted } -func filterEndpoints(slice *discoveryv1beta1.EndpointSlice, podCidr string, nodeName string) []discoveryv1beta1.Endpoint { +func filterEndpoints(slice *discoveryv1beta1.EndpointSlice, ipamClient liqonet.IpamClient, nodeName string) []discoveryv1beta1.Endpoint { var epList []discoveryv1beta1.Endpoint // Two possibilities: (1) exclude all virtual nodes (2) for _, v := range slice.Endpoints { t := v.Topology["kubernetes.io/hostname"] if t != nodeName { - ip, err := forge.ChangePodIp(podCidr, v.Addresses[0]) + response, err := ipamClient.MapEndpointIP(context.Background(), + &liqonet.MapRequest{ClusterID: strings.TrimPrefix(nodeName, "liqo-"), Ip: v.Addresses[0]}) if err != nil { klog.Error(err) } newEp := discoveryv1beta1.Endpoint{ - Addresses: []string{ip}, + Addresses: []string{response.GetIp()}, Conditions: v.Conditions, Hostname: nil, TargetRef: nil, diff --git a/pkg/virtualKubelet/forge/forge_suite_test.go b/pkg/virtualKubelet/forge/forge_suite_test.go deleted file mode 100644 index 04a485c9c5..0000000000 --- a/pkg/virtualKubelet/forge/forge_suite_test.go +++ /dev/null @@ -1,13 +0,0 @@ -package forge_test - -import ( - "testing" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -func TestForge(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Forge Suite") -} diff --git a/pkg/virtualKubelet/forge/pods.go b/pkg/virtualKubelet/forge/pods.go index 91a2b0b2f7..b726cd0250 100644 --- a/pkg/virtualKubelet/forge/pods.go +++ b/pkg/virtualKubelet/forge/pods.go @@ -2,7 +2,6 @@ package forge import ( "fmt" - "net" "strings" corev1 "k8s.io/api/core/v1" @@ -11,6 +10,7 @@ import ( "k8s.io/klog" liqoconst "github.com/liqotech/liqo/pkg/consts" + liqonet "github.com/liqotech/liqo/pkg/liqonet" "github.com/liqotech/liqo/pkg/virtualKubelet" apimgmt "github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection" "github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection/reflectors" @@ -55,12 +55,12 @@ func (f *apiForger) podStatusForeignToHome(foreignObj, homeObj runtime.Object) * homePod.Status = foreignPod.Status if homePod.Status.PodIP != "" { - newIp, err := ChangePodIp(f.remoteRemappedPodCidr.Value().ToString(), foreignPod.Status.PodIP) + newIP, err := liqonet.MapIPToNetwork(f.remoteRemappedPodCidr.Value().ToString(), foreignPod.Status.PodIP) if err != nil { klog.Error(err) } - homePod.Status.PodIP = newIp - homePod.Status.PodIPs[0].IP = newIp + homePod.Status.PodIP = newIP + homePod.Status.PodIPs[0].IP = newIP } if foreignPod.DeletionTimestamp != nil { @@ -188,37 +188,6 @@ func filterVolumeMounts(volumes []corev1.Volume, volumeMountsIn []corev1.VolumeM return volumeMounts } -// ChangePodIp creates a new IP address obtained by means of the old IP address and the new podCIDR. -func ChangePodIp(newPodCidr string, oldPodIp string) (newPodIp string, err error) { - if newPodCidr == liqoconst.DefaultCIDRValue { - return oldPodIp, nil - } - // Parse newPodCidr - ip, network, err := net.ParseCIDR(newPodCidr) - if err != nil { - return "", err - } - // Get mask - mask := network.Mask - // Get slice of bytes for newPodCidr - // Type net.IP has underlying type []byte - newIP := ip.To4() - // Get oldPodIp as slice of bytes - oldIP := net.ParseIP(oldPodIp) - if oldIP == nil { - return "", fmt.Errorf("cannot parse oldIp") - } - oldIP = oldIP.To4() - // Substitute the last 32-mask bits of newPodCidr(newIP) with bits taken by the old ip - for i := 0; i < len(mask); i++ { - // Step 1: NOT(mask[i]) = mask[i] ^ 0xff. They are the 'host' bits - // Step 2: BITWISE AND between the host bits and oldIP[i] zeroes the network bits in oldIP[i] - // Step 3: BITWISE OR copies the result of step 2 in newIP[i] - newIP[i] |= (mask[i] ^ 0xff) & oldIP[i] - } - return net.IP(newIP).String(), nil -} - func forgeAffinity() *corev1.Affinity { return &corev1.Affinity{ NodeAffinity: &corev1.NodeAffinity{ diff --git a/pkg/virtualKubelet/forge/pods_test.go b/pkg/virtualKubelet/forge/pods_test.go deleted file mode 100644 index 31f5c2e6ae..0000000000 --- a/pkg/virtualKubelet/forge/pods_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package forge_test - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/ginkgo/extensions/table" - "github.com/onsi/gomega" - - "github.com/liqotech/liqo/pkg/virtualKubelet/forge" -) - -var _ = Describe("Pods", func() { - DescribeTable("ChangePodIp", - func(oldIp, newPodCidr, expectedIP string, expectedErr string) { - ip, err := forge.ChangePodIp(oldIp, newPodCidr) - if expectedErr != "" { - gomega.Expect(err.Error()).To(gomega.Equal(expectedErr)) - } else { - gomega.Expect(err).ToNot(gomega.HaveOccurred()) - } - gomega.Expect(ip).To(gomega.Equal(expectedIP)) - }, - Entry("Mapping 10.2.1.3 to 10.0.4.0/24", "10.0.4.0/24", "10.2.1.3", "10.0.4.3", ""), - Entry("Mapping 10.2.1.128 to 10.0.4.0/24", "10.0.4.0/24", "10.2.1.128", "10.0.4.128", ""), - Entry("Mapping 10.2.1.1 to 10.0.4.0/24", "10.0.4.0/24", "10.2.1.1", "10.0.4.1", ""), - Entry("Mapping 10.2.127.128 to 10.0.128.0/23", "10.0.128.0/23", "10.2.127.128", "10.0.129.128", ""), - Entry("Mapping 10.2.128.128 to 10.0.126.0/23", "10.0.127.0/23", "10.2.128.128", "10.0.127.128", ""), - Entry("Mapping 10.2.128.128 to 10.0.126.0/25", "10.0.126.0/25", "10.2.128.128", "10.0.126.0", ""), - Entry("Using an invalid newPodCidr", "10.0..0/25", "10.2.128.128", "", "invalid CIDR address: 10.0..0/25"), - Entry("Using an invalid oldIp", "10.0.0.0/25", "10.2...128", "", "cannot parse oldIp"), - ) -}) diff --git a/test/unit/virtualKubelet/reflection/endpointslices_test.go b/test/unit/virtualKubelet/reflection/endpointslices_test.go index 8c779cf6b8..3629857700 100644 --- a/test/unit/virtualKubelet/reflection/endpointslices_test.go +++ b/test/unit/virtualKubelet/reflection/endpointslices_test.go @@ -11,6 +11,8 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/klog" + liqonetTest "github.com/liqotech/liqo/pkg/liqonet/test" + apimgmt "github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection" api "github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection/reflectors" "github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection/reflectors/outgoing" @@ -37,6 +39,7 @@ func TestEndpointAdd(t *testing.T) { APIReflector: Greflector, LocalRemappedPodCIDR: types.NewNetworkingOption("localRemappedPodCIDR", "10.0.0.0/16"), VirtualNodeName: types.NewNetworkingOption("VirtualNodeName", "vk-node"), + IpamClient: &liqonetTest.MockIpam{RemappedPodCIDR: "10.0.0.0/16"}, } reflector.SetSpecializedPreProcessingHandlers() @@ -120,6 +123,7 @@ func TestEndpointAdd2(t *testing.T) { APIReflector: Greflector, LocalRemappedPodCIDR: types.NewNetworkingOption("localRemappedPodCIDR", "10.0.0.0/16"), VirtualNodeName: types.NewNetworkingOption("VirtualNodeName", "vk-node"), + IpamClient: &liqonetTest.MockIpam{RemappedPodCIDR: "10.0.0.0/16"}, } reflector.SetSpecializedPreProcessingHandlers()