Skip to content

Commit

Permalink
New functions for NatMappingInflater module
Browse files Browse the repository at this point in the history
Added AddMapping and TerminateNatMappingsPerCluster functions.
  • Loading branch information
davidefalcone1 committed Jun 16, 2021
1 parent 351455f commit 24fd15f
Show file tree
Hide file tree
Showing 9 changed files with 677 additions and 76 deletions.
7 changes: 7 additions & 0 deletions apis/net/v1alpha1/ipamstorage_types.go
Expand Up @@ -38,6 +38,9 @@ type Subnets struct {
// ClusterMapping is an empty struct.
type ClusterMapping struct{}

// ConfiguredCluster is an empty struct used as value for NatMappingsConfigured.
type ConfiguredCluster struct{}

// EndpointMapping describes a relation between an enpoint IP and an IP belonging to ExternalCIDR.
type EndpointMapping struct {
// IP belonging to cluster ExtenalCIDR assigned to this endpoint.
Expand All @@ -61,6 +64,10 @@ type IpamSpec struct {
ExternalCIDR string `json:"externalCIDR"`
// Endpoint IP mappings. Key is the IP address of the local endpoint, value is the IP of the remote endpoint, so it belongs to an ExternalCIDR
EndpointMappings map[string]EndpointMapping `json:"endpointMappings"`
// NatMappingsConfigured is a map that contains all the remote clusters
// for which NatMappings have been already configured.
// Key is a cluster ID, value is an empty struct.
NatMappingsConfigured map[string]ConfiguredCluster `json:"natMappingsConfigured"`
// Cluster PodCIDR
PodCIDR string `json:"podCIDR"`
// ServiceCIDR
Expand Down
22 changes: 22 additions & 0 deletions apis/net/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions deployments/liqo/crds/net.liqo.io_ipamstorages.yaml
Expand Up @@ -95,6 +95,15 @@ spec:
externalCIDR:
description: Cluster ExternalCIDR
type: string
natMappingsConfigured:
additionalProperties:
description: ConfiguredCluster is an empty struct used as value
for NatMappingsConfigured.
type: object
description: NatMappingsConfigured is a map that contains all the
remote clusters for which NatMappings have been already configured.
Key is a cluster ID, value is an empty struct.
type: object
podCIDR:
description: Cluster PodCIDR
type: string
Expand All @@ -118,6 +127,7 @@ spec:
- clusterSubnets
- endpointMappings
- externalCIDR
- natMappingsConfigured
- podCIDR
- pools
- prefixes
Expand Down
Expand Up @@ -224,8 +224,8 @@ func (tec *TunnelEndpointCreator) SetupSignalHandlerForTunEndCreator() context.C
go func(r *TunnelEndpointCreator) {
sig := <-c
klog.Infof("received signal: %s", sig.String())
// Stop IPAM gRPC server.
tec.IPManager.StopGRPCServer()
// Stop IPAM.
tec.IPManager.Terminate()
// closing shared informers
close(r.ForeignClusterStopWatcher)
done()
Expand Down
190 changes: 148 additions & 42 deletions pkg/liqonet/ipam.go
Expand Up @@ -2,6 +2,7 @@ package liqonet

import (
"context"
"errors"
"fmt"
"net"
"strings"
Expand Down Expand Up @@ -31,7 +32,7 @@ type Ipam interface {
reserved as well. The remapping mechanism can be applied on:
- PodCIDR
- ExternalCIDR
- Both
- Both.
*/
GetSubnetsPerCluster(podCidr, externalCIDR, clusterID string) (string, string, error)
// RemoveClusterConfig deletes the IPAM configuration of a remote cluster,
Expand All @@ -55,8 +56,8 @@ type Ipam interface {
SetPodCIDR(podCIDR string) error
// SetServiceCIDR sets the cluster ServiceCIDR.
SetServiceCIDR(serviceCIDR string) error
// StopGRPCServer stops the gRPC server gracefully.
StopGRPCServer()
// Terminate function enforces a graceful termination of the IPAM module.
Terminate()
IpamServer
}

Expand Down Expand Up @@ -122,6 +123,12 @@ func (liqoIPAM *IPAM) Init(pools []string, dynClient dynamic.Interface, listenin
return nil
}

// Terminate function stops the gRPC server.
func (liqoIPAM *IPAM) Terminate() {
// Stop GRPC server
liqoIPAM.grpcServer.GracefulStop()
}

func (liqoIPAM *IPAM) initRPCServer(port int) error {
lis, err := net.Listen("tcp", fmt.Sprintf("%s%d", "0.0.0.0:", port))
if err != nil {
Expand All @@ -138,11 +145,6 @@ func (liqoIPAM *IPAM) initRPCServer(port int) error {
return nil
}

// StopGRPCServer stops the gRPC server gracefully.
func (liqoIPAM *IPAM) StopGRPCServer() {
liqoIPAM.grpcServer.GracefulStop()
}

// reservePoolInHalves handles the special case in which a network pool has to be entirely reserved
// Since AcquireSpecificChildPrefix would return an error, reservePoolInHalves acquires the two
// halves of the network pool.
Expand Down Expand Up @@ -513,7 +515,7 @@ func (liqoIPAM *IPAM) FreeReservedSubnet(network string) error {
// deletes local subnets for the remote cluster.
func (liqoIPAM *IPAM) RemoveClusterConfig(clusterID string) error {
var subnets netv1alpha1.Subnets
var subnetsExist bool
var subnetsExist, natMappingsPerClusterConfigured bool

if clusterID == "" {
return &liqoneterrors.WrongParameter{
Expand All @@ -528,47 +530,56 @@ func (liqoIPAM *IPAM) RemoveClusterConfig(clusterID string) error {
return fmt.Errorf("cannot get cluster subnets: %w", err)
}

// Get NatMappingsConfigured map
natMappingsConfigured, err := liqoIPAM.ipamStorage.getNatMappingsConfigured()
if err != nil {
return fmt.Errorf("cannot establish if NatMappings for cluster %s have been configured: %w", clusterID, err)
}

subnets, subnetsExist = clusterSubnets[clusterID]
if !subnetsExist {
_, natMappingsPerClusterConfigured = natMappingsConfigured[clusterID]
if !subnetsExist && !natMappingsPerClusterConfigured {
// Nothing to be done here
return nil
}

// Free PodCidr
if err := liqoIPAM.FreeReservedSubnet(subnets.RemotePodCIDR); err != nil {
return err
// If an error happened after the following if, there is no need of
// re-executing the following block.
if subnetsExist {
// Free PodCidr
if err := liqoIPAM.FreeReservedSubnet(subnets.RemotePodCIDR); err != nil {
return err
}

// Free ExternalCidr
if err := liqoIPAM.FreeReservedSubnet(subnets.RemoteExternalCIDR); err != nil {
return err
}
klog.Infof("Networks assigned to cluster %s have just been freed", clusterID)

delete(clusterSubnets, clusterID)
if err := liqoIPAM.ipamStorage.updateClusterSubnets(clusterSubnets); err != nil {
return fmt.Errorf("cannot update clusterSubnets:%w", err)
}
}

// Free ExternalCidr
if err := liqoIPAM.FreeReservedSubnet(subnets.RemoteExternalCIDR); err != nil {
return err
// Terminate NatMappings
if err := liqoIPAM.terminateNatMappingsPerCluster(clusterID); err != nil {
return fmt.Errorf("unable to terminate NAT mappings for cluster %s: %w", clusterID, err)
}
klog.Infof("Networks assigned to cluster %s have just been freed", clusterID)

delete(clusterSubnets, clusterID)
if err := liqoIPAM.ipamStorage.updateClusterSubnets(clusterSubnets); err != nil {
return fmt.Errorf("cannot update clusterSubnets:%w", err)
delete(natMappingsConfigured, clusterID)
// Update natMappingsConfigured
natMappingsConfigured[clusterID] = netv1alpha1.ConfiguredCluster{}
if err := liqoIPAM.ipamStorage.updateNatMappingsConfigured(natMappingsConfigured); err != nil {
return fmt.Errorf("unable to update NatMappingsConfigured: %w", err)
}
return nil
}

// initNatMappingsPerCluster is a wrapper for inflater InitNatMappingsPerCluster.
func (liqoIPAM *IPAM) initNatMappingsPerCluster(clusterID string) error {
if clusterID == "" {
return &liqoneterrors.WrongParameter{
Parameter: consts.ClusterIDLabelName,
Reason: liqoneterrors.StringNotEmpty,
}
}
// Get cluster subnets
clusterSubnets, err := liqoIPAM.ipamStorage.getClusterSubnets()
if err != nil {
return fmt.Errorf("cannot get cluster subnets: %w", err)
}
subnets, exists := clusterSubnets[clusterID]
if !exists {
return fmt.Errorf("subnets of cluster %s are not set", clusterID)
}
func (liqoIPAM *IPAM) initNatMappingsPerCluster(clusterID string, subnets netv1alpha1.Subnets) error {
var err error
// InitNatMappingsPerCluster does need the Pod CIDR used in home cluster for remote pods (subnets.RemotePodCIDR)
// and the ExternalCIDR used in remote cluster for local exported resources.
var externalCIDR string
Expand All @@ -584,6 +595,71 @@ func (liqoIPAM *IPAM) initNatMappingsPerCluster(clusterID string) error {
return liqoIPAM.natMappingInflater.InitNatMappingsPerCluster(subnets.RemotePodCIDR, externalCIDR, clusterID)
}

// terminateNatMappingsPerCluster is used to update endpointMappings after a cluster peering is terminated.
func (liqoIPAM *IPAM) terminateNatMappingsPerCluster(clusterID string) error {
// Get NAT mappings
// natMappings keys are the set of endpoint reflected on remote cluster.
natMappings, err := liqoIPAM.natMappingInflater.GetNatMappings(clusterID)
if err != nil && !errors.Is(err, &liqoneterrors.MissingInit{
StructureName: clusterID,
}) {
// Unknown error
return fmt.Errorf("cannot get NAT mappings for cluster %s:%w", clusterID, err)
}
if err != nil && errors.Is(err, &liqoneterrors.MissingInit{
StructureName: clusterID,
}) {
/*
This can happen if:
a: terminateNatMappingsPerCluster has been called more than once after initialization.
b. terminateNatMappingsPerCluster has been called once without previous initialization.
In both circumstances, there are no actions to be performed here.
*/
return nil
}

// Get endpointMappings
endpointMappings, err := liqoIPAM.ipamStorage.getEndpointMappings()
if err != nil {
return fmt.Errorf("cannot get Endpoint IPs: %w", err)
}

// Remove cluster from the list of clusters the endpoint is reflected in.
for ip := range natMappings {
m := endpointMappings[ip]
delete(m.ClusterMappings, clusterID)
if len(m.ClusterMappings) == 0 {
// There are no more clusters using this endpoint IP

// Get local ExternalCIDR
localExternalCIDR, err := liqoIPAM.ipamStorage.getExternalCIDR()
if err != nil {
return fmt.Errorf("cannot get ExternalCIDR: %w", err)
}
// Free IP
if err := liqoIPAM.ipam.ReleaseIPFromPrefix(localExternalCIDR, endpointMappings[ip].IP); err != nil {
return fmt.Errorf("cannot free IP: %w", err)
}
klog.Infof("IP %s (mapped from %s) has been freed", endpointMappings[ip].IP, ip)
// Delete entry
delete(endpointMappings, ip)
} else {
endpointMappings[ip] = m
}
}

// Update endpointMappings
if err := liqoIPAM.ipamStorage.updateEndpointMappings(endpointMappings); err != nil {
return fmt.Errorf("cannot update endpointMappings:%w", err)
}

// Free/Remove resouces in Inflater
if err := liqoIPAM.natMappingInflater.TerminateNatMappingsPerCluster(clusterID); err != nil {
return err
}
return nil
}

// AddNetworkPool adds a network to the set of network pools.
func (liqoIPAM *IPAM) AddNetworkPool(network string) error {
// Get resource
Expand Down Expand Up @@ -683,21 +759,40 @@ func (liqoIPAM *IPAM) RemoveNetworkPool(network string) error {
// AddLocalSubnetsPerCluster stores how the PodCIDR and the ExternalCIDR of local cluster
// has been remapped in a remote cluster. If no remapping happened, then the CIDR value should be equal to "None".
func (liqoIPAM *IPAM) AddLocalSubnetsPerCluster(podCIDR, externalCIDR, clusterID string) error {
var exists bool
var subnetsExist, natMappingsPerClusterConfigured bool
var subnets netv1alpha1.Subnets
if clusterID == "" {
return &liqoneterrors.WrongParameter{
Parameter: consts.ClusterIDLabelName,
Reason: liqoneterrors.StringNotEmpty,
}
}

// Get cluster subnets
clusterSubnets, err := liqoIPAM.ipamStorage.getClusterSubnets()
if err != nil {
return fmt.Errorf("cannot get cluster subnets: %w", err)
}
// Check existence
subnets, exists = clusterSubnets[clusterID]
if exists && subnets.LocalNATPodCIDR != "" && subnets.LocalNATExternalCIDR != "" {

// Get NatMappingsConfigured map
natMappingsConfigured, err := liqoIPAM.ipamStorage.getNatMappingsConfigured()
if err != nil {
return fmt.Errorf("cannot establish if NatMappings for cluster %s have been configured: %w", clusterID, err)
}

// Check existence of subnets struct and NatMappings have already been configured
subnets, subnetsExist = clusterSubnets[clusterID]
_, natMappingsPerClusterConfigured = natMappingsConfigured[clusterID]
if !subnetsExist {
return fmt.Errorf("remote subnets for cluster %s do not exist yet. Call first GetSubnetsPerCluster",
clusterID)
}
if subnets.LocalNATPodCIDR != "" && subnets.LocalNATExternalCIDR != "" && natMappingsPerClusterConfigured {
return nil
}

// Set networks
if exists {
if subnetsExist {
subnets.LocalNATPodCIDR = podCIDR
subnets.LocalNATExternalCIDR = externalCIDR
} else {
Expand All @@ -718,9 +813,15 @@ func (liqoIPAM *IPAM) AddLocalSubnetsPerCluster(podCIDR, externalCIDR, clusterID
}

// Init NAT mappings
if err := liqoIPAM.initNatMappingsPerCluster(clusterID); err != nil {
if err := liqoIPAM.initNatMappingsPerCluster(clusterID, subnets); err != nil {
return fmt.Errorf("unable to initialize NAT mappings per cluster %s: %w", clusterID, err)
}

// Update natMappingsConfigured
natMappingsConfigured[clusterID] = netv1alpha1.ConfiguredCluster{}
if err := liqoIPAM.ipamStorage.updateNatMappingsConfigured(natMappingsConfigured); err != nil {
return fmt.Errorf("unable to update NatMappingsConfigured: %w", err)
}
return nil
}

Expand Down Expand Up @@ -813,6 +914,11 @@ func (liqoIPAM *IPAM) mapIPToExternalCIDR(clusterID, remoteExternalCIDR, ip stri
return "", fmt.Errorf("cannot map endpoint IP %s to ExternalCIDR:%w", endpointMapping.IP, err)
}

// Add NAT mapping
if err := liqoIPAM.natMappingInflater.AddMapping(ip, newIP, clusterID); err != nil {
return "", fmt.Errorf("cannot add NAT mapping: %w", err)
}

return newIP, nil
}

Expand Down

0 comments on commit 24fd15f

Please sign in to comment.