Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix vpc connetion can not got ready #25

Merged
merged 8 commits into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 68 additions & 170 deletions pkg/controller/vpcpeering/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package vpcpeering

import (
"context"
"fmt"
"reflect"
"strings"

"github.com/aws/aws-sdk-go-v2/service/ec2"
Expand Down Expand Up @@ -61,7 +63,8 @@ func SetupVPCPeeringConnection(mgr ctrl.Manager, l logging.Logger, rl workqueue.
return ctrl.NewControllerManagedBy(mgr).
Named(name).
WithOptions(controller.Options{
RateLimiter: ratelimiter.NewDefaultManagedRateLimiter(rl),
RateLimiter: ratelimiter.NewDefaultManagedRateLimiter(rl),
MaxConcurrentReconciles: 5,
}).
For(&svcapitypes.VPCPeeringConnection{}).
Complete(managed.NewReconciler(mgr,
Expand Down Expand Up @@ -148,11 +151,13 @@ func (e *external) Observe(ctx context.Context, mg cpresource.Managed) (managed.
input := peering.GenerateDescribeVpcPeeringConnectionsInput(cr)
resp, err := e.client.DescribeVpcPeeringConnectionsRequest(input).Send(ctx)
if err != nil {
return managed.ExternalObservation{ResourceExists: false}, awsclient.Wrap(err, errDescribe)
return managed.ExternalObservation{ResourceExists: false}, errors.Wrap(err, errDescribe)
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Describe VpcPeeringConnections")
// TODO: if user delete vpc peering in aws cloud, how ensure subresource deleted
if len(resp.VpcPeeringConnections) == 0 {
return managed.ExternalObservation{ResourceExists: true}, nil
return managed.ExternalObservation{ResourceExists: false}, nil
zjj2wry marked this conversation as resolved.
Show resolved Hide resolved
}

existedPeer := resp.VpcPeeringConnections[0]
Expand All @@ -161,6 +166,18 @@ func (e *external) Observe(ctx context.Context, mg cpresource.Managed) (managed.
return managed.ExternalObservation{ResourceExists: false}, nil
}

currentPeeringStatus := peering.BuildPeering(resp).Status.AtProvider

e.log.WithValues("VpcPeering", cr.Name).Debug("Build current peering status")

// update current peering status to status.atProvider
if !reflect.DeepEqual(currentPeeringStatus, cr.Status.AtProvider) {
currentPeeringStatus.DeepCopyInto(&cr.Status.AtProvider)
if err := e.kube.Status().Update(ctx, cr); err != nil {
return managed.ExternalObservation{ResourceExists: true}, err
}
}

_, routeTableReady := cr.GetAnnotations()[routeTableEnsured]
_, hostZoneReady := cr.GetAnnotations()[hostedZoneEnsured]
_, attributeReady := cr.GetAnnotations()[attributeModified]
Expand All @@ -172,7 +189,6 @@ func (e *external) Observe(ctx context.Context, mg cpresource.Managed) (managed.
}, errors.Wrap(e.kube.Status().Update(ctx, cr), errUpdateManagedStatus)
}

peering.BuildPeering(resp).Status.AtProvider.DeepCopyInto(&cr.Status.AtProvider)
if existedPeer.Status.Code == ec2.VpcPeeringConnectionStateReasonCodeActive && cr.GetCondition(ApprovedCondition).Status == corev1.ConditionTrue {
cr.Status.SetConditions(xpv1.Available())
}
Expand All @@ -194,9 +210,11 @@ func (e *external) Create(ctx context.Context, mg cpresource.Managed) (managed.E

resp, err := e.client.CreateVpcPeeringConnectionRequest(input).Send(ctx)
if err != nil {
return managed.ExternalCreation{}, awsclient.Wrap(err, errCreate)
return managed.ExternalCreation{}, errors.Wrap(err, "create VpcPeeringConnection")
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Create VpcPeeringConnectio successful")

tags := make([]ec2.Tag, 0)
tags = append(tags, ec2.Tag{
Key: aws.String("Name"),
Expand All @@ -217,169 +235,27 @@ func (e *external) Create(ctx context.Context, mg cpresource.Managed) (managed.E
Tags: tags,
}).Send(ctx)
if err != nil {
return managed.ExternalCreation{}, awsclient.Wrap(err, errCreate)
return managed.ExternalCreation{}, errors.Wrap(err, "create tag for vpc peering")
}

meta.SetExternalName(cr, aws.StringValue(resp.VpcPeeringConnection.VpcPeeringConnectionId))
e.log.WithValues("VpcPeering", cr.Name).Debug("Create tag for vpc peering successful")

if resp.VpcPeeringConnection.AccepterVpcInfo != nil {
f0 := &svcapitypes.VPCPeeringConnectionVPCInfo{}
if resp.VpcPeeringConnection.AccepterVpcInfo.CidrBlock != nil {
f0.CIDRBlock = resp.VpcPeeringConnection.AccepterVpcInfo.CidrBlock
}
if resp.VpcPeeringConnection.AccepterVpcInfo.CidrBlockSet != nil {
f0f1 := []*svcapitypes.CIDRBlock{}
for _, f0f1iter := range resp.VpcPeeringConnection.AccepterVpcInfo.CidrBlockSet {
f0f1elem := &svcapitypes.CIDRBlock{}
if f0f1iter.CidrBlock != nil {
f0f1elem.CIDRBlock = f0f1iter.CidrBlock
}
f0f1 = append(f0f1, f0f1elem)
}
f0.CIDRBlockSet = f0f1
}
if resp.VpcPeeringConnection.AccepterVpcInfo.Ipv6CidrBlockSet != nil {
f0f2 := []*svcapitypes.IPv6CIDRBlock{}
for _, f0f2iter := range resp.VpcPeeringConnection.AccepterVpcInfo.Ipv6CidrBlockSet {
f0f2elem := &svcapitypes.IPv6CIDRBlock{}
if f0f2iter.Ipv6CidrBlock != nil {
f0f2elem.IPv6CIDRBlock = f0f2iter.Ipv6CidrBlock
}
f0f2 = append(f0f2, f0f2elem)
}
f0.IPv6CIDRBlockSet = f0f2
}
if resp.VpcPeeringConnection.AccepterVpcInfo.OwnerId != nil {
f0.OwnerID = resp.VpcPeeringConnection.AccepterVpcInfo.OwnerId
}
if resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions != nil {
f0f4 := &svcapitypes.VPCPeeringConnectionOptionsDescription{}
if resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowDnsResolutionFromRemoteVpc != nil {
f0f4.AllowDNSResolutionFromRemoteVPC = resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowDnsResolutionFromRemoteVpc
}
if resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowEgressFromLocalClassicLinkToRemoteVpc != nil {
f0f4.AllowEgressFromLocalClassicLinkToRemoteVPC = resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowEgressFromLocalClassicLinkToRemoteVpc
}
if resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowEgressFromLocalVpcToRemoteClassicLink != nil {
f0f4.AllowEgressFromLocalVPCToRemoteClassicLink = resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowEgressFromLocalVpcToRemoteClassicLink
}
f0.PeeringOptions = f0f4
}
if resp.VpcPeeringConnection.AccepterVpcInfo.Region != nil {
f0.Region = resp.VpcPeeringConnection.AccepterVpcInfo.Region
}
if resp.VpcPeeringConnection.AccepterVpcInfo.VpcId != nil {
f0.VPCID = resp.VpcPeeringConnection.AccepterVpcInfo.VpcId
}
cr.Status.AtProvider.AccepterVPCInfo = f0
} else {
cr.Status.AtProvider.AccepterVPCInfo = nil
}
if resp.VpcPeeringConnection.ExpirationTime != nil {
cr.Status.AtProvider.ExpirationTime = &metav1.Time{
Time: *resp.VpcPeeringConnection.ExpirationTime,
}
} else {
cr.Status.AtProvider.ExpirationTime = nil
}
if resp.VpcPeeringConnection.RequesterVpcInfo != nil {
f2 := &svcapitypes.VPCPeeringConnectionVPCInfo{}
if resp.VpcPeeringConnection.RequesterVpcInfo.CidrBlock != nil {
f2.CIDRBlock = resp.VpcPeeringConnection.RequesterVpcInfo.CidrBlock
}
if resp.VpcPeeringConnection.RequesterVpcInfo.CidrBlockSet != nil {
f2f1 := []*svcapitypes.CIDRBlock{}
for _, f2f1iter := range resp.VpcPeeringConnection.RequesterVpcInfo.CidrBlockSet {
f2f1elem := &svcapitypes.CIDRBlock{}
if f2f1iter.CidrBlock != nil {
f2f1elem.CIDRBlock = f2f1iter.CidrBlock
}
f2f1 = append(f2f1, f2f1elem)
}
f2.CIDRBlockSet = f2f1
}
if resp.VpcPeeringConnection.RequesterVpcInfo.Ipv6CidrBlockSet != nil {
f2f2 := []*svcapitypes.IPv6CIDRBlock{}
for _, f2f2iter := range resp.VpcPeeringConnection.RequesterVpcInfo.Ipv6CidrBlockSet {
f2f2elem := &svcapitypes.IPv6CIDRBlock{}
if f2f2iter.Ipv6CidrBlock != nil {
f2f2elem.IPv6CIDRBlock = f2f2iter.Ipv6CidrBlock
}
f2f2 = append(f2f2, f2f2elem)
}
f2.IPv6CIDRBlockSet = f2f2
}
if resp.VpcPeeringConnection.RequesterVpcInfo.OwnerId != nil {
f2.OwnerID = resp.VpcPeeringConnection.RequesterVpcInfo.OwnerId
}
if resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions != nil {
f2f4 := &svcapitypes.VPCPeeringConnectionOptionsDescription{}
if resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowDnsResolutionFromRemoteVpc != nil {
f2f4.AllowDNSResolutionFromRemoteVPC = resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowDnsResolutionFromRemoteVpc
}
if resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowEgressFromLocalClassicLinkToRemoteVpc != nil {
f2f4.AllowEgressFromLocalClassicLinkToRemoteVPC = resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowEgressFromLocalClassicLinkToRemoteVpc
}
if resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowEgressFromLocalVpcToRemoteClassicLink != nil {
f2f4.AllowEgressFromLocalVPCToRemoteClassicLink = resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowEgressFromLocalVpcToRemoteClassicLink
}
f2.PeeringOptions = f2f4
}
if resp.VpcPeeringConnection.RequesterVpcInfo.Region != nil {
f2.Region = resp.VpcPeeringConnection.RequesterVpcInfo.Region
}
if resp.VpcPeeringConnection.RequesterVpcInfo.VpcId != nil {
f2.VPCID = resp.VpcPeeringConnection.RequesterVpcInfo.VpcId
}
cr.Status.AtProvider.RequesterVPCInfo = f2
} else {
cr.Status.AtProvider.RequesterVPCInfo = nil
}
if resp.VpcPeeringConnection.Status != nil {
f3 := &svcapitypes.VPCPeeringConnectionStateReason{}
f3.Code = aws.String(string(resp.VpcPeeringConnection.Status.Code))
if resp.VpcPeeringConnection.Status.Message != nil {
f3.Message = resp.VpcPeeringConnection.Status.Message
}
cr.Status.AtProvider.Status = f3
} else {
cr.Status.AtProvider.Status = nil
}
if resp.VpcPeeringConnection.Tags != nil {
f4 := []*svcapitypes.Tag{}
for _, f4iter := range resp.VpcPeeringConnection.Tags {
f4elem := &svcapitypes.Tag{}
if f4iter.Key != nil {
f4elem.Key = f4iter.Key
}
if f4iter.Value != nil {
f4elem.Value = f4iter.Value
}
f4 = append(f4, f4elem)
}
cr.Status.AtProvider.Tags = f4
} else {
cr.Status.AtProvider.Tags = nil
}
if resp.VpcPeeringConnection.VpcPeeringConnectionId != nil {
cr.Status.AtProvider.VPCPeeringConnectionID = resp.VpcPeeringConnection.VpcPeeringConnectionId
} else {
cr.Status.AtProvider.VPCPeeringConnectionID = nil
}
zjj2wry marked this conversation as resolved.
Show resolved Hide resolved
meta.SetExternalName(cr, aws.StringValue(resp.VpcPeeringConnection.VpcPeeringConnectionId))

return managed.ExternalCreation{}, nil
return managed.ExternalCreation{ExternalNameAssigned: true}, nil
}

func (e *external) Update(ctx context.Context, mg cpresource.Managed) (managed.ExternalUpdate, error) { // nolint:gocyclo
cr, ok := mg.(*svcapitypes.VPCPeeringConnection)
if !ok {
return managed.ExternalUpdate{}, errors.New(errUnexpectedObject)
}

_, routeTableReady := cr.GetAnnotations()[routeTableEnsured]
_, hostZoneReady := cr.GetAnnotations()[hostedZoneEnsured]
_, attributeReady := cr.GetAnnotations()[attributeModified]

if !attributeReady {
if !attributeReady && cr.Status.AtProvider.VPCPeeringConnectionID != nil {
modifyVpcPeeringConnectionOptionsInput := &ec2.ModifyVpcPeeringConnectionOptionsInput{
VpcPeeringConnectionId: cr.Status.AtProvider.VPCPeeringConnectionID,
RequesterPeeringConnectionOptions: &ec2.PeeringConnectionOptionsRequest{
Expand All @@ -396,8 +272,10 @@ func (e *external) Update(ctx context.Context, mg cpresource.Managed) (managed.E
cr.Annotations[attributeModified] = "true"
err = e.kube.Update(ctx, cr)
if err != nil {
return managed.ExternalUpdate{}, awsclient.Wrap(err, "error update peering annotations")
return managed.ExternalUpdate{}, errors.Wrap(err, "error update peering annotations")
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Modify VpcPeeringConnection successful")
}

if !routeTableReady && cr.Status.AtProvider.VPCPeeringConnectionID != nil {
Expand All @@ -413,24 +291,38 @@ func (e *external) Update(ctx context.Context, mg cpresource.Managed) (managed.E
}
routeTablesRes, err := e.client.DescribeRouteTablesRequest(describeRouteTablesInput).Send(ctx)
if err != nil {
return managed.ExternalUpdate{}, awsclient.Wrap(err, errDescribeRouteTable)
return managed.ExternalUpdate{}, errors.Wrap(err, errDescribeRouteTable)
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Describe RouteTables for creating")

for _, rt := range routeTablesRes.RouteTables {
createRouteInput := &ec2.CreateRouteInput{
RouteTableId: rt.RouteTableId,
DestinationCidrBlock: cr.Spec.ForProvider.PeerCIDR,
VpcPeeringConnectionId: cr.Status.AtProvider.VPCPeeringConnectionID,
}
createRouteRes, err := e.client.CreateRouteRequest(createRouteInput).Send(ctx)
_, err := e.client.CreateRouteRequest(createRouteInput).Send(ctx)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() != "RouteAlreadyExists" {
return managed.ExternalUpdate{}, awsclient.Wrap(err, errCreate)
// FIXME: The error is not aws.Err type?
if !strings.Contains(err.Error(), "RouteAlreadyExists") {
return managed.ExternalUpdate{}, errors.Wrap(err, "create route for vpc peering")
} else {

for _, route := range rt.Routes {
// The route identified by DestinationCidrBlock, if route table already have DestinationCidrBlock point to other vpc peering connetion ID, should be return error
if route.DestinationCidrBlock != nil && *route.DestinationCidrBlock == *cr.Spec.ForProvider.PeerCIDR {
if route.VpcPeeringConnectionId != nil && *route.VpcPeeringConnectionId == *cr.Status.AtProvider.VPCPeeringConnectionID {
e.log.WithValues("VpcPeering", cr.Name).Debug("Route already exist, no need to recreate", "RouteTableId", rt.RouteTableId, "DestinationCidrBlock", *route.DestinationCidrBlock)
continue
} else {
return managed.ExternalUpdate{}, errors.Wrap(err, fmt.Sprintf("failed add route for vpc peering connection: %s, routeID: %s", *cr.Status.AtProvider.VPCPeeringConnectionID, *rt.RouteTableId))
}
}
}
}
} else {
e.log.Info("Create route for route table", "RouteTableID", *rt.RouteTableId, "return", *createRouteRes.Return)
e.log.WithValues("VpcPeering", cr.Name).Debug("Create Route successful", "RouteTableId", rt.RouteTableId)
}
}
if cr.Annotations == nil {
Expand All @@ -439,7 +331,7 @@ func (e *external) Update(ctx context.Context, mg cpresource.Managed) (managed.E
cr.Annotations[routeTableEnsured] = "true"
err = e.kube.Update(ctx, cr)
if err != nil {
return managed.ExternalUpdate{}, awsclient.Wrap(err, "error update peering annotations")
return managed.ExternalUpdate{}, errors.Wrap(err, "error update peering annotations")
}
}

Expand All @@ -455,14 +347,17 @@ func (e *external) Update(ctx context.Context, mg cpresource.Managed) (managed.E
if err != nil && !isAlreadyCreated(err) {
return managed.ExternalUpdate{}, errors.Wrap(err, errCreateHostzone)
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Create VPCAssociationAuthorization successful")

if cr.Annotations == nil {
cr.Annotations = map[string]string{}
}
cr.Annotations[hostedZoneEnsured] = "true"
err = e.kube.Update(ctx, cr)

if err != nil {
return managed.ExternalUpdate{}, awsclient.Wrap(err, "error update peering annotations")
return managed.ExternalUpdate{}, errors.Wrap(err, "error update peering annotations")
}
}

Expand Down Expand Up @@ -494,9 +389,10 @@ func (e *external) Delete(ctx context.Context, mg cpresource.Managed) error { //
VPCRegion: route53.VPCRegion(*cr.Spec.ForProvider.PeerRegion),
},
}).Send(ctx)
if err != nil {
e.log.Info("delete VPCAssociationAuthorization failed", "error", err)
if err != nil && !strings.Contains(err.Error(), "VPCAssociationAuthorizationNotFound") {
return errors.Wrap(err, "delete VPCAssociationAuthorization")
}
e.log.WithValues("VpcPeering", cr.Name).Debug("Delete VPCAssociationAuthorization successful")

filter := ec2.Filter{
Name: aws.String("vpc-id"),
Expand All @@ -505,14 +401,14 @@ func (e *external) Delete(ctx context.Context, mg cpresource.Managed) error { //
},
}
describeRouteTablesInput := &ec2.DescribeRouteTablesInput{
Filters: []ec2.Filter{filter},
MaxResults: aws.Int64(10),
Filters: []ec2.Filter{filter},
}
routeTablesRes, err := e.client.DescribeRouteTablesRequest(describeRouteTablesInput).Send(ctx)
if err != nil {
return err
return errors.Wrap(err, "describe RouteTables")
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Describe RouteTables for deleting", "result", routeTablesRes.String())
for _, rt := range routeTablesRes.RouteTables {
for _, r := range rt.Routes {
if r.VpcPeeringConnectionId != nil && cr.Status.AtProvider.VPCPeeringConnectionID != nil && *r.VpcPeeringConnectionId == *cr.Status.AtProvider.VPCPeeringConnectionID {
Expand All @@ -522,15 +418,16 @@ func (e *external) Delete(ctx context.Context, mg cpresource.Managed) error { //
RouteTableId: rt.RouteTableId,
}).Send(ctx)
if err != nil {
return err
return errors.Wrap(err, "delete Route")
}
e.log.WithValues("VpcPeering", cr.Name).Debug("Delete route successful", "RouteTableId", rt.RouteTableId)
}
}
}

err = e.deleteVPCPeeringConnection(ctx, cr)

return err
return errors.Wrap(err, "delete VPCPeeringConnection")
}

func isAWSErr(err error, code string, message string) bool {
Expand All @@ -554,8 +451,9 @@ func (e *external) deleteVPCPeeringConnection(ctx context.Context, cr *svcapityp
}).Send(ctx)

if err != nil && !isAWSErr(err, "InvalidVpcPeeringConnectionID.NotFound", "") {
return awsclient.Wrap(err, "errDelete")
return errors.Wrap(err, "delete vpc peering connection")
}
e.log.WithValues("VpcPeering", cr.Name).Debug("Delete VpcPeeringConnection successful")
}
}

Expand Down
Loading