Skip to content

Commit

Permalink
Fix vpc connetion can not got ready (#25)
Browse files Browse the repository at this point in the history
* Fix vpc connetion can not got ready

* remove debug log

* deprecated aws error

* fix ut

* address comments

* remove debug log

* fix bug

* rollback code
  • Loading branch information
zjj2wry authored and tangenti committed Jan 19, 2022
1 parent b35d70a commit 66416fa
Show file tree
Hide file tree
Showing 2 changed files with 266 additions and 170 deletions.
238 changes: 68 additions & 170 deletions pkg/controller/vpcpeering/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package vpcpeering

import (
"context"
"fmt"
"reflect"
"strings"

"github.com/aws/aws-sdk-go-v2/service/ec2"
Expand Down Expand Up @@ -61,7 +63,8 @@ func SetupVPCPeeringConnection(mgr ctrl.Manager, l logging.Logger, rl workqueue.
return ctrl.NewControllerManagedBy(mgr).
Named(name).
WithOptions(controller.Options{
RateLimiter: ratelimiter.NewDefaultManagedRateLimiter(rl),
RateLimiter: ratelimiter.NewDefaultManagedRateLimiter(rl),
MaxConcurrentReconciles: 5,
}).
For(&svcapitypes.VPCPeeringConnection{}).
Complete(managed.NewReconciler(mgr,
Expand Down Expand Up @@ -148,11 +151,13 @@ func (e *external) Observe(ctx context.Context, mg cpresource.Managed) (managed.
input := peering.GenerateDescribeVpcPeeringConnectionsInput(cr)
resp, err := e.client.DescribeVpcPeeringConnectionsRequest(input).Send(ctx)
if err != nil {
return managed.ExternalObservation{ResourceExists: false}, awsclient.Wrap(err, errDescribe)
return managed.ExternalObservation{ResourceExists: false}, errors.Wrap(err, errDescribe)
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Describe VpcPeeringConnections")
// TODO: if user delete vpc peering in aws cloud, how ensure subresource deleted
if len(resp.VpcPeeringConnections) == 0 {
return managed.ExternalObservation{ResourceExists: true}, nil
return managed.ExternalObservation{ResourceExists: false}, nil
}

existedPeer := resp.VpcPeeringConnections[0]
Expand All @@ -161,6 +166,18 @@ func (e *external) Observe(ctx context.Context, mg cpresource.Managed) (managed.
return managed.ExternalObservation{ResourceExists: false}, nil
}

currentPeeringStatus := peering.BuildPeering(resp).Status.AtProvider

e.log.WithValues("VpcPeering", cr.Name).Debug("Build current peering status")

// update current peering status to status.atProvider
if !reflect.DeepEqual(currentPeeringStatus, cr.Status.AtProvider) {
currentPeeringStatus.DeepCopyInto(&cr.Status.AtProvider)
if err := e.kube.Status().Update(ctx, cr); err != nil {
return managed.ExternalObservation{ResourceExists: true}, err
}
}

_, routeTableReady := cr.GetAnnotations()[routeTableEnsured]
_, hostZoneReady := cr.GetAnnotations()[hostedZoneEnsured]
_, attributeReady := cr.GetAnnotations()[attributeModified]
Expand All @@ -172,7 +189,6 @@ func (e *external) Observe(ctx context.Context, mg cpresource.Managed) (managed.
}, errors.Wrap(e.kube.Status().Update(ctx, cr), errUpdateManagedStatus)
}

peering.BuildPeering(resp).Status.AtProvider.DeepCopyInto(&cr.Status.AtProvider)
if existedPeer.Status.Code == ec2.VpcPeeringConnectionStateReasonCodeActive && cr.GetCondition(ApprovedCondition).Status == corev1.ConditionTrue {
cr.Status.SetConditions(xpv1.Available())
}
Expand All @@ -194,9 +210,11 @@ func (e *external) Create(ctx context.Context, mg cpresource.Managed) (managed.E

resp, err := e.client.CreateVpcPeeringConnectionRequest(input).Send(ctx)
if err != nil {
return managed.ExternalCreation{}, awsclient.Wrap(err, errCreate)
return managed.ExternalCreation{}, errors.Wrap(err, "create VpcPeeringConnection")
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Create VpcPeeringConnectio successful")

tags := make([]ec2.Tag, 0)
tags = append(tags, ec2.Tag{
Key: aws.String("Name"),
Expand All @@ -217,169 +235,27 @@ func (e *external) Create(ctx context.Context, mg cpresource.Managed) (managed.E
Tags: tags,
}).Send(ctx)
if err != nil {
return managed.ExternalCreation{}, awsclient.Wrap(err, errCreate)
return managed.ExternalCreation{}, errors.Wrap(err, "create tag for vpc peering")
}

meta.SetExternalName(cr, aws.StringValue(resp.VpcPeeringConnection.VpcPeeringConnectionId))
e.log.WithValues("VpcPeering", cr.Name).Debug("Create tag for vpc peering successful")

if resp.VpcPeeringConnection.AccepterVpcInfo != nil {
f0 := &svcapitypes.VPCPeeringConnectionVPCInfo{}
if resp.VpcPeeringConnection.AccepterVpcInfo.CidrBlock != nil {
f0.CIDRBlock = resp.VpcPeeringConnection.AccepterVpcInfo.CidrBlock
}
if resp.VpcPeeringConnection.AccepterVpcInfo.CidrBlockSet != nil {
f0f1 := []*svcapitypes.CIDRBlock{}
for _, f0f1iter := range resp.VpcPeeringConnection.AccepterVpcInfo.CidrBlockSet {
f0f1elem := &svcapitypes.CIDRBlock{}
if f0f1iter.CidrBlock != nil {
f0f1elem.CIDRBlock = f0f1iter.CidrBlock
}
f0f1 = append(f0f1, f0f1elem)
}
f0.CIDRBlockSet = f0f1
}
if resp.VpcPeeringConnection.AccepterVpcInfo.Ipv6CidrBlockSet != nil {
f0f2 := []*svcapitypes.IPv6CIDRBlock{}
for _, f0f2iter := range resp.VpcPeeringConnection.AccepterVpcInfo.Ipv6CidrBlockSet {
f0f2elem := &svcapitypes.IPv6CIDRBlock{}
if f0f2iter.Ipv6CidrBlock != nil {
f0f2elem.IPv6CIDRBlock = f0f2iter.Ipv6CidrBlock
}
f0f2 = append(f0f2, f0f2elem)
}
f0.IPv6CIDRBlockSet = f0f2
}
if resp.VpcPeeringConnection.AccepterVpcInfo.OwnerId != nil {
f0.OwnerID = resp.VpcPeeringConnection.AccepterVpcInfo.OwnerId
}
if resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions != nil {
f0f4 := &svcapitypes.VPCPeeringConnectionOptionsDescription{}
if resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowDnsResolutionFromRemoteVpc != nil {
f0f4.AllowDNSResolutionFromRemoteVPC = resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowDnsResolutionFromRemoteVpc
}
if resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowEgressFromLocalClassicLinkToRemoteVpc != nil {
f0f4.AllowEgressFromLocalClassicLinkToRemoteVPC = resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowEgressFromLocalClassicLinkToRemoteVpc
}
if resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowEgressFromLocalVpcToRemoteClassicLink != nil {
f0f4.AllowEgressFromLocalVPCToRemoteClassicLink = resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowEgressFromLocalVpcToRemoteClassicLink
}
f0.PeeringOptions = f0f4
}
if resp.VpcPeeringConnection.AccepterVpcInfo.Region != nil {
f0.Region = resp.VpcPeeringConnection.AccepterVpcInfo.Region
}
if resp.VpcPeeringConnection.AccepterVpcInfo.VpcId != nil {
f0.VPCID = resp.VpcPeeringConnection.AccepterVpcInfo.VpcId
}
cr.Status.AtProvider.AccepterVPCInfo = f0
} else {
cr.Status.AtProvider.AccepterVPCInfo = nil
}
if resp.VpcPeeringConnection.ExpirationTime != nil {
cr.Status.AtProvider.ExpirationTime = &metav1.Time{
Time: *resp.VpcPeeringConnection.ExpirationTime,
}
} else {
cr.Status.AtProvider.ExpirationTime = nil
}
if resp.VpcPeeringConnection.RequesterVpcInfo != nil {
f2 := &svcapitypes.VPCPeeringConnectionVPCInfo{}
if resp.VpcPeeringConnection.RequesterVpcInfo.CidrBlock != nil {
f2.CIDRBlock = resp.VpcPeeringConnection.RequesterVpcInfo.CidrBlock
}
if resp.VpcPeeringConnection.RequesterVpcInfo.CidrBlockSet != nil {
f2f1 := []*svcapitypes.CIDRBlock{}
for _, f2f1iter := range resp.VpcPeeringConnection.RequesterVpcInfo.CidrBlockSet {
f2f1elem := &svcapitypes.CIDRBlock{}
if f2f1iter.CidrBlock != nil {
f2f1elem.CIDRBlock = f2f1iter.CidrBlock
}
f2f1 = append(f2f1, f2f1elem)
}
f2.CIDRBlockSet = f2f1
}
if resp.VpcPeeringConnection.RequesterVpcInfo.Ipv6CidrBlockSet != nil {
f2f2 := []*svcapitypes.IPv6CIDRBlock{}
for _, f2f2iter := range resp.VpcPeeringConnection.RequesterVpcInfo.Ipv6CidrBlockSet {
f2f2elem := &svcapitypes.IPv6CIDRBlock{}
if f2f2iter.Ipv6CidrBlock != nil {
f2f2elem.IPv6CIDRBlock = f2f2iter.Ipv6CidrBlock
}
f2f2 = append(f2f2, f2f2elem)
}
f2.IPv6CIDRBlockSet = f2f2
}
if resp.VpcPeeringConnection.RequesterVpcInfo.OwnerId != nil {
f2.OwnerID = resp.VpcPeeringConnection.RequesterVpcInfo.OwnerId
}
if resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions != nil {
f2f4 := &svcapitypes.VPCPeeringConnectionOptionsDescription{}
if resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowDnsResolutionFromRemoteVpc != nil {
f2f4.AllowDNSResolutionFromRemoteVPC = resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowDnsResolutionFromRemoteVpc
}
if resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowEgressFromLocalClassicLinkToRemoteVpc != nil {
f2f4.AllowEgressFromLocalClassicLinkToRemoteVPC = resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowEgressFromLocalClassicLinkToRemoteVpc
}
if resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowEgressFromLocalVpcToRemoteClassicLink != nil {
f2f4.AllowEgressFromLocalVPCToRemoteClassicLink = resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowEgressFromLocalVpcToRemoteClassicLink
}
f2.PeeringOptions = f2f4
}
if resp.VpcPeeringConnection.RequesterVpcInfo.Region != nil {
f2.Region = resp.VpcPeeringConnection.RequesterVpcInfo.Region
}
if resp.VpcPeeringConnection.RequesterVpcInfo.VpcId != nil {
f2.VPCID = resp.VpcPeeringConnection.RequesterVpcInfo.VpcId
}
cr.Status.AtProvider.RequesterVPCInfo = f2
} else {
cr.Status.AtProvider.RequesterVPCInfo = nil
}
if resp.VpcPeeringConnection.Status != nil {
f3 := &svcapitypes.VPCPeeringConnectionStateReason{}
f3.Code = aws.String(string(resp.VpcPeeringConnection.Status.Code))
if resp.VpcPeeringConnection.Status.Message != nil {
f3.Message = resp.VpcPeeringConnection.Status.Message
}
cr.Status.AtProvider.Status = f3
} else {
cr.Status.AtProvider.Status = nil
}
if resp.VpcPeeringConnection.Tags != nil {
f4 := []*svcapitypes.Tag{}
for _, f4iter := range resp.VpcPeeringConnection.Tags {
f4elem := &svcapitypes.Tag{}
if f4iter.Key != nil {
f4elem.Key = f4iter.Key
}
if f4iter.Value != nil {
f4elem.Value = f4iter.Value
}
f4 = append(f4, f4elem)
}
cr.Status.AtProvider.Tags = f4
} else {
cr.Status.AtProvider.Tags = nil
}
if resp.VpcPeeringConnection.VpcPeeringConnectionId != nil {
cr.Status.AtProvider.VPCPeeringConnectionID = resp.VpcPeeringConnection.VpcPeeringConnectionId
} else {
cr.Status.AtProvider.VPCPeeringConnectionID = nil
}
meta.SetExternalName(cr, aws.StringValue(resp.VpcPeeringConnection.VpcPeeringConnectionId))

return managed.ExternalCreation{}, nil
return managed.ExternalCreation{ExternalNameAssigned: true}, nil
}

func (e *external) Update(ctx context.Context, mg cpresource.Managed) (managed.ExternalUpdate, error) { // nolint:gocyclo
cr, ok := mg.(*svcapitypes.VPCPeeringConnection)
if !ok {
return managed.ExternalUpdate{}, errors.New(errUnexpectedObject)
}

_, routeTableReady := cr.GetAnnotations()[routeTableEnsured]
_, hostZoneReady := cr.GetAnnotations()[hostedZoneEnsured]
_, attributeReady := cr.GetAnnotations()[attributeModified]

if !attributeReady {
if !attributeReady && cr.Status.AtProvider.VPCPeeringConnectionID != nil {
modifyVpcPeeringConnectionOptionsInput := &ec2.ModifyVpcPeeringConnectionOptionsInput{
VpcPeeringConnectionId: cr.Status.AtProvider.VPCPeeringConnectionID,
RequesterPeeringConnectionOptions: &ec2.PeeringConnectionOptionsRequest{
Expand All @@ -396,8 +272,10 @@ func (e *external) Update(ctx context.Context, mg cpresource.Managed) (managed.E
cr.Annotations[attributeModified] = "true"
err = e.kube.Update(ctx, cr)
if err != nil {
return managed.ExternalUpdate{}, awsclient.Wrap(err, "error update peering annotations")
return managed.ExternalUpdate{}, errors.Wrap(err, "error update peering annotations")
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Modify VpcPeeringConnection successful")
}

if !routeTableReady && cr.Status.AtProvider.VPCPeeringConnectionID != nil {
Expand All @@ -413,24 +291,38 @@ func (e *external) Update(ctx context.Context, mg cpresource.Managed) (managed.E
}
routeTablesRes, err := e.client.DescribeRouteTablesRequest(describeRouteTablesInput).Send(ctx)
if err != nil {
return managed.ExternalUpdate{}, awsclient.Wrap(err, errDescribeRouteTable)
return managed.ExternalUpdate{}, errors.Wrap(err, errDescribeRouteTable)
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Describe RouteTables for creating")

for _, rt := range routeTablesRes.RouteTables {
createRouteInput := &ec2.CreateRouteInput{
RouteTableId: rt.RouteTableId,
DestinationCidrBlock: cr.Spec.ForProvider.PeerCIDR,
VpcPeeringConnectionId: cr.Status.AtProvider.VPCPeeringConnectionID,
}
createRouteRes, err := e.client.CreateRouteRequest(createRouteInput).Send(ctx)
_, err := e.client.CreateRouteRequest(createRouteInput).Send(ctx)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() != "RouteAlreadyExists" {
return managed.ExternalUpdate{}, awsclient.Wrap(err, errCreate)
// FIXME: The error is not aws.Err type?
if !strings.Contains(err.Error(), "RouteAlreadyExists") {
return managed.ExternalUpdate{}, errors.Wrap(err, "create route for vpc peering")
} else {

for _, route := range rt.Routes {
// The route identified by DestinationCidrBlock, if route table already have DestinationCidrBlock point to other vpc peering connetion ID, should be return error
if route.DestinationCidrBlock != nil && *route.DestinationCidrBlock == *cr.Spec.ForProvider.PeerCIDR {
if route.VpcPeeringConnectionId != nil && *route.VpcPeeringConnectionId == *cr.Status.AtProvider.VPCPeeringConnectionID {
e.log.WithValues("VpcPeering", cr.Name).Debug("Route already exist, no need to recreate", "RouteTableId", rt.RouteTableId, "DestinationCidrBlock", *route.DestinationCidrBlock)
continue
} else {
return managed.ExternalUpdate{}, errors.Wrap(err, fmt.Sprintf("failed add route for vpc peering connection: %s, routeID: %s", *cr.Status.AtProvider.VPCPeeringConnectionID, *rt.RouteTableId))
}
}
}
}
} else {
e.log.Info("Create route for route table", "RouteTableID", *rt.RouteTableId, "return", *createRouteRes.Return)
e.log.WithValues("VpcPeering", cr.Name).Debug("Create Route successful", "RouteTableId", rt.RouteTableId)
}
}
if cr.Annotations == nil {
Expand All @@ -439,7 +331,7 @@ func (e *external) Update(ctx context.Context, mg cpresource.Managed) (managed.E
cr.Annotations[routeTableEnsured] = "true"
err = e.kube.Update(ctx, cr)
if err != nil {
return managed.ExternalUpdate{}, awsclient.Wrap(err, "error update peering annotations")
return managed.ExternalUpdate{}, errors.Wrap(err, "error update peering annotations")
}
}

Expand All @@ -455,14 +347,17 @@ func (e *external) Update(ctx context.Context, mg cpresource.Managed) (managed.E
if err != nil && !isAlreadyCreated(err) {
return managed.ExternalUpdate{}, errors.Wrap(err, errCreateHostzone)
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Create VPCAssociationAuthorization successful")

if cr.Annotations == nil {
cr.Annotations = map[string]string{}
}
cr.Annotations[hostedZoneEnsured] = "true"
err = e.kube.Update(ctx, cr)

if err != nil {
return managed.ExternalUpdate{}, awsclient.Wrap(err, "error update peering annotations")
return managed.ExternalUpdate{}, errors.Wrap(err, "error update peering annotations")
}
}

Expand Down Expand Up @@ -494,9 +389,10 @@ func (e *external) Delete(ctx context.Context, mg cpresource.Managed) error { //
VPCRegion: route53.VPCRegion(*cr.Spec.ForProvider.PeerRegion),
},
}).Send(ctx)
if err != nil {
e.log.Info("delete VPCAssociationAuthorization failed", "error", err)
if err != nil && !strings.Contains(err.Error(), "VPCAssociationAuthorizationNotFound") {
return errors.Wrap(err, "delete VPCAssociationAuthorization")
}
e.log.WithValues("VpcPeering", cr.Name).Debug("Delete VPCAssociationAuthorization successful")

filter := ec2.Filter{
Name: aws.String("vpc-id"),
Expand All @@ -505,14 +401,14 @@ func (e *external) Delete(ctx context.Context, mg cpresource.Managed) error { //
},
}
describeRouteTablesInput := &ec2.DescribeRouteTablesInput{
Filters: []ec2.Filter{filter},
MaxResults: aws.Int64(10),
Filters: []ec2.Filter{filter},
}
routeTablesRes, err := e.client.DescribeRouteTablesRequest(describeRouteTablesInput).Send(ctx)
if err != nil {
return err
return errors.Wrap(err, "describe RouteTables")
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Describe RouteTables for deleting", "result", routeTablesRes.String())
for _, rt := range routeTablesRes.RouteTables {
for _, r := range rt.Routes {
if r.VpcPeeringConnectionId != nil && cr.Status.AtProvider.VPCPeeringConnectionID != nil && *r.VpcPeeringConnectionId == *cr.Status.AtProvider.VPCPeeringConnectionID {
Expand All @@ -522,15 +418,16 @@ func (e *external) Delete(ctx context.Context, mg cpresource.Managed) error { //
RouteTableId: rt.RouteTableId,
}).Send(ctx)
if err != nil {
return err
return errors.Wrap(err, "delete Route")
}
e.log.WithValues("VpcPeering", cr.Name).Debug("Delete route successful", "RouteTableId", rt.RouteTableId)
}
}
}

err = e.deleteVPCPeeringConnection(ctx, cr)

return err
return errors.Wrap(err, "delete VPCPeeringConnection")
}

func isAWSErr(err error, code string, message string) bool {
Expand All @@ -554,8 +451,9 @@ func (e *external) deleteVPCPeeringConnection(ctx context.Context, cr *svcapityp
}).Send(ctx)

if err != nil && !isAWSErr(err, "InvalidVpcPeeringConnectionID.NotFound", "") {
return awsclient.Wrap(err, "errDelete")
return errors.Wrap(err, "delete vpc peering connection")
}
e.log.WithValues("VpcPeering", cr.Name).Debug("Delete VpcPeeringConnection successful")
}
}

Expand Down
Loading

0 comments on commit 66416fa

Please sign in to comment.