Skip to content

Commit

Permalink
Fix vpc connetion can not got ready
Browse files Browse the repository at this point in the history
  • Loading branch information
zjj2wry committed Dec 2, 2021
1 parent 5f86a45 commit b2757f4
Showing 1 changed file with 61 additions and 169 deletions.
230 changes: 61 additions & 169 deletions pkg/controller/vpcpeering/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package vpcpeering

import (
"context"
"fmt"
"reflect"
"strings"

"github.com/aws/aws-sdk-go-v2/service/ec2"
Expand Down Expand Up @@ -151,8 +153,9 @@ func (e *external) Observe(ctx context.Context, mg cpresource.Managed) (managed.
return managed.ExternalObservation{ResourceExists: false}, awsclient.Wrap(err, errDescribe)
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Describe VpcPeeringConnections", "result", resp.String())
if len(resp.VpcPeeringConnections) == 0 {
return managed.ExternalObservation{ResourceExists: true}, nil
return managed.ExternalObservation{ResourceExists: false}, nil
}

existedPeer := resp.VpcPeeringConnections[0]
Expand All @@ -161,6 +164,18 @@ func (e *external) Observe(ctx context.Context, mg cpresource.Managed) (managed.
return managed.ExternalObservation{ResourceExists: false}, nil
}

currentPeeringStatus := peering.BuildPeering(resp).Status.AtProvider

e.log.WithValues("VpcPeering", cr.Name).Debug("Build current peering status", "status", currentPeeringStatus)

// update current peering status to status.atProvider
if !reflect.DeepEqual(currentPeeringStatus, cr.Status.AtProvider) {
currentPeeringStatus.DeepCopyInto(&cr.Status.AtProvider)
if err := e.kube.Status().Update(ctx, cr); err != nil {
return managed.ExternalObservation{ResourceExists: true}, err
}
}

_, routeTableReady := cr.GetAnnotations()[routeTableEnsured]
_, hostZoneReady := cr.GetAnnotations()[hostedZoneEnsured]
_, attributeReady := cr.GetAnnotations()[attributeModified]
Expand All @@ -172,7 +187,6 @@ func (e *external) Observe(ctx context.Context, mg cpresource.Managed) (managed.
}, errors.Wrap(e.kube.Status().Update(ctx, cr), errUpdateManagedStatus)
}

peering.BuildPeering(resp).Status.AtProvider.DeepCopyInto(&cr.Status.AtProvider)
if existedPeer.Status.Code == ec2.VpcPeeringConnectionStateReasonCodeActive && cr.GetCondition(ApprovedCondition).Status == corev1.ConditionTrue {
cr.Status.SetConditions(xpv1.Available())
}
Expand All @@ -194,9 +208,11 @@ func (e *external) Create(ctx context.Context, mg cpresource.Managed) (managed.E

resp, err := e.client.CreateVpcPeeringConnectionRequest(input).Send(ctx)
if err != nil {
return managed.ExternalCreation{}, awsclient.Wrap(err, errCreate)
return managed.ExternalCreation{}, awsclient.Wrap(err, "create VpcPeeringConnection")
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Create VpcPeeringConnectio successful", "result", resp.String())

tags := make([]ec2.Tag, 0)
tags = append(tags, ec2.Tag{
Key: aws.String("Name"),
Expand All @@ -210,164 +226,21 @@ func (e *external) Create(ctx context.Context, mg cpresource.Managed) (managed.E
})
}

_, err = e.client.CreateTagsRequest(&ec2.CreateTagsInput{
res, err := e.client.CreateTagsRequest(&ec2.CreateTagsInput{
Resources: []string{
*resp.VpcPeeringConnection.VpcPeeringConnectionId,
},
Tags: tags,
}).Send(ctx)
if err != nil {
return managed.ExternalCreation{}, awsclient.Wrap(err, errCreate)
return managed.ExternalCreation{}, awsclient.Wrap(err, "create tag for vpc peering")
}

meta.SetExternalName(cr, aws.StringValue(resp.VpcPeeringConnection.VpcPeeringConnectionId))
e.log.WithValues("VpcPeering", cr.Name).Debug("Create tag for vpc peering successful", "result", res.String())

if resp.VpcPeeringConnection.AccepterVpcInfo != nil {
f0 := &svcapitypes.VPCPeeringConnectionVPCInfo{}
if resp.VpcPeeringConnection.AccepterVpcInfo.CidrBlock != nil {
f0.CIDRBlock = resp.VpcPeeringConnection.AccepterVpcInfo.CidrBlock
}
if resp.VpcPeeringConnection.AccepterVpcInfo.CidrBlockSet != nil {
f0f1 := []*svcapitypes.CIDRBlock{}
for _, f0f1iter := range resp.VpcPeeringConnection.AccepterVpcInfo.CidrBlockSet {
f0f1elem := &svcapitypes.CIDRBlock{}
if f0f1iter.CidrBlock != nil {
f0f1elem.CIDRBlock = f0f1iter.CidrBlock
}
f0f1 = append(f0f1, f0f1elem)
}
f0.CIDRBlockSet = f0f1
}
if resp.VpcPeeringConnection.AccepterVpcInfo.Ipv6CidrBlockSet != nil {
f0f2 := []*svcapitypes.IPv6CIDRBlock{}
for _, f0f2iter := range resp.VpcPeeringConnection.AccepterVpcInfo.Ipv6CidrBlockSet {
f0f2elem := &svcapitypes.IPv6CIDRBlock{}
if f0f2iter.Ipv6CidrBlock != nil {
f0f2elem.IPv6CIDRBlock = f0f2iter.Ipv6CidrBlock
}
f0f2 = append(f0f2, f0f2elem)
}
f0.IPv6CIDRBlockSet = f0f2
}
if resp.VpcPeeringConnection.AccepterVpcInfo.OwnerId != nil {
f0.OwnerID = resp.VpcPeeringConnection.AccepterVpcInfo.OwnerId
}
if resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions != nil {
f0f4 := &svcapitypes.VPCPeeringConnectionOptionsDescription{}
if resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowDnsResolutionFromRemoteVpc != nil {
f0f4.AllowDNSResolutionFromRemoteVPC = resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowDnsResolutionFromRemoteVpc
}
if resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowEgressFromLocalClassicLinkToRemoteVpc != nil {
f0f4.AllowEgressFromLocalClassicLinkToRemoteVPC = resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowEgressFromLocalClassicLinkToRemoteVpc
}
if resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowEgressFromLocalVpcToRemoteClassicLink != nil {
f0f4.AllowEgressFromLocalVPCToRemoteClassicLink = resp.VpcPeeringConnection.AccepterVpcInfo.PeeringOptions.AllowEgressFromLocalVpcToRemoteClassicLink
}
f0.PeeringOptions = f0f4
}
if resp.VpcPeeringConnection.AccepterVpcInfo.Region != nil {
f0.Region = resp.VpcPeeringConnection.AccepterVpcInfo.Region
}
if resp.VpcPeeringConnection.AccepterVpcInfo.VpcId != nil {
f0.VPCID = resp.VpcPeeringConnection.AccepterVpcInfo.VpcId
}
cr.Status.AtProvider.AccepterVPCInfo = f0
} else {
cr.Status.AtProvider.AccepterVPCInfo = nil
}
if resp.VpcPeeringConnection.ExpirationTime != nil {
cr.Status.AtProvider.ExpirationTime = &metav1.Time{
Time: *resp.VpcPeeringConnection.ExpirationTime,
}
} else {
cr.Status.AtProvider.ExpirationTime = nil
}
if resp.VpcPeeringConnection.RequesterVpcInfo != nil {
f2 := &svcapitypes.VPCPeeringConnectionVPCInfo{}
if resp.VpcPeeringConnection.RequesterVpcInfo.CidrBlock != nil {
f2.CIDRBlock = resp.VpcPeeringConnection.RequesterVpcInfo.CidrBlock
}
if resp.VpcPeeringConnection.RequesterVpcInfo.CidrBlockSet != nil {
f2f1 := []*svcapitypes.CIDRBlock{}
for _, f2f1iter := range resp.VpcPeeringConnection.RequesterVpcInfo.CidrBlockSet {
f2f1elem := &svcapitypes.CIDRBlock{}
if f2f1iter.CidrBlock != nil {
f2f1elem.CIDRBlock = f2f1iter.CidrBlock
}
f2f1 = append(f2f1, f2f1elem)
}
f2.CIDRBlockSet = f2f1
}
if resp.VpcPeeringConnection.RequesterVpcInfo.Ipv6CidrBlockSet != nil {
f2f2 := []*svcapitypes.IPv6CIDRBlock{}
for _, f2f2iter := range resp.VpcPeeringConnection.RequesterVpcInfo.Ipv6CidrBlockSet {
f2f2elem := &svcapitypes.IPv6CIDRBlock{}
if f2f2iter.Ipv6CidrBlock != nil {
f2f2elem.IPv6CIDRBlock = f2f2iter.Ipv6CidrBlock
}
f2f2 = append(f2f2, f2f2elem)
}
f2.IPv6CIDRBlockSet = f2f2
}
if resp.VpcPeeringConnection.RequesterVpcInfo.OwnerId != nil {
f2.OwnerID = resp.VpcPeeringConnection.RequesterVpcInfo.OwnerId
}
if resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions != nil {
f2f4 := &svcapitypes.VPCPeeringConnectionOptionsDescription{}
if resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowDnsResolutionFromRemoteVpc != nil {
f2f4.AllowDNSResolutionFromRemoteVPC = resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowDnsResolutionFromRemoteVpc
}
if resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowEgressFromLocalClassicLinkToRemoteVpc != nil {
f2f4.AllowEgressFromLocalClassicLinkToRemoteVPC = resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowEgressFromLocalClassicLinkToRemoteVpc
}
if resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowEgressFromLocalVpcToRemoteClassicLink != nil {
f2f4.AllowEgressFromLocalVPCToRemoteClassicLink = resp.VpcPeeringConnection.RequesterVpcInfo.PeeringOptions.AllowEgressFromLocalVpcToRemoteClassicLink
}
f2.PeeringOptions = f2f4
}
if resp.VpcPeeringConnection.RequesterVpcInfo.Region != nil {
f2.Region = resp.VpcPeeringConnection.RequesterVpcInfo.Region
}
if resp.VpcPeeringConnection.RequesterVpcInfo.VpcId != nil {
f2.VPCID = resp.VpcPeeringConnection.RequesterVpcInfo.VpcId
}
cr.Status.AtProvider.RequesterVPCInfo = f2
} else {
cr.Status.AtProvider.RequesterVPCInfo = nil
}
if resp.VpcPeeringConnection.Status != nil {
f3 := &svcapitypes.VPCPeeringConnectionStateReason{}
f3.Code = aws.String(string(resp.VpcPeeringConnection.Status.Code))
if resp.VpcPeeringConnection.Status.Message != nil {
f3.Message = resp.VpcPeeringConnection.Status.Message
}
cr.Status.AtProvider.Status = f3
} else {
cr.Status.AtProvider.Status = nil
}
if resp.VpcPeeringConnection.Tags != nil {
f4 := []*svcapitypes.Tag{}
for _, f4iter := range resp.VpcPeeringConnection.Tags {
f4elem := &svcapitypes.Tag{}
if f4iter.Key != nil {
f4elem.Key = f4iter.Key
}
if f4iter.Value != nil {
f4elem.Value = f4iter.Value
}
f4 = append(f4, f4elem)
}
cr.Status.AtProvider.Tags = f4
} else {
cr.Status.AtProvider.Tags = nil
}
if resp.VpcPeeringConnection.VpcPeeringConnectionId != nil {
cr.Status.AtProvider.VPCPeeringConnectionID = resp.VpcPeeringConnection.VpcPeeringConnectionId
} else {
cr.Status.AtProvider.VPCPeeringConnectionID = nil
}
meta.SetExternalName(cr, aws.StringValue(resp.VpcPeeringConnection.VpcPeeringConnectionId))

return managed.ExternalCreation{}, nil
return managed.ExternalCreation{ExternalNameAssigned: true}, nil
}

func (e *external) Update(ctx context.Context, mg cpresource.Managed) (managed.ExternalUpdate, error) { // nolint:gocyclo
Expand All @@ -379,16 +252,16 @@ func (e *external) Update(ctx context.Context, mg cpresource.Managed) (managed.E
_, hostZoneReady := cr.GetAnnotations()[hostedZoneEnsured]
_, attributeReady := cr.GetAnnotations()[attributeModified]

if !attributeReady {
if !attributeReady && cr.Status.AtProvider.VPCPeeringConnectionID != nil {
modifyVpcPeeringConnectionOptionsInput := &ec2.ModifyVpcPeeringConnectionOptionsInput{
VpcPeeringConnectionId: cr.Status.AtProvider.VPCPeeringConnectionID,
RequesterPeeringConnectionOptions: &ec2.PeeringConnectionOptionsRequest{
AllowDnsResolutionFromRemoteVpc: aws.Bool(true),
},
}
_, err := e.client.ModifyVpcPeeringConnectionOptionsRequest(modifyVpcPeeringConnectionOptionsInput).Send(ctx)
res, err := e.client.ModifyVpcPeeringConnectionOptionsRequest(modifyVpcPeeringConnectionOptionsInput).Send(ctx)
if err != nil {
return managed.ExternalUpdate{}, errors.Wrap(err, errModifyVpcPeering)
return managed.ExternalUpdate{}, awsclient.Wrap(err, errModifyVpcPeering)
}
if cr.Annotations == nil {
cr.Annotations = map[string]string{}
Expand All @@ -398,6 +271,8 @@ func (e *external) Update(ctx context.Context, mg cpresource.Managed) (managed.E
if err != nil {
return managed.ExternalUpdate{}, awsclient.Wrap(err, "error update peering annotations")
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Modify VpcPeeringConnection successful", "result", res.String())
}

if !routeTableReady && cr.Status.AtProvider.VPCPeeringConnectionID != nil {
Expand All @@ -416,6 +291,8 @@ func (e *external) Update(ctx context.Context, mg cpresource.Managed) (managed.E
return managed.ExternalUpdate{}, awsclient.Wrap(err, errDescribeRouteTable)
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Describe RouteTables for creating", "result", routeTablesRes.String())

for _, rt := range routeTablesRes.RouteTables {
createRouteInput := &ec2.CreateRouteInput{
RouteTableId: rt.RouteTableId,
Expand All @@ -424,13 +301,22 @@ func (e *external) Update(ctx context.Context, mg cpresource.Managed) (managed.E
}
createRouteRes, err := e.client.CreateRouteRequest(createRouteInput).Send(ctx)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() != "RouteAlreadyExists" {
return managed.ExternalUpdate{}, awsclient.Wrap(err, errCreate)
// FIXME: The error is not aws.Err type?
if !strings.Contains(err.Error(), "RouteAlreadyExists") {
return managed.ExternalUpdate{}, awsclient.Wrap(err, "create route for vpc peering")
} else {
// The route identified by DestinationCidrBlock, if route table already have DestinationCidrBlock point to other vpc peering connetion ID, should be return error
for _, route := range rt.Routes {
if *route.DestinationCidrBlock == *cr.Spec.ForProvider.PeerCIDR && route.VpcPeeringConnectionId != nil && route.VpcPeeringConnectionId == cr.Status.AtProvider.VPCPeeringConnectionID {
e.log.WithValues("VpcPeering", cr.Name).Debug("Route already exist, no need to recreate", "RouteTableId", rt.RouteTableId, "DestinationCidrBlock", *route.DestinationCidrBlock)
continue
} else {
return managed.ExternalUpdate{}, awsclient.Wrap(err, fmt.Sprintf("failed add route for vpc peering connection: %s, routeID: %s", *cr.Status.AtProvider.VPCPeeringConnectionID, *rt.RouteTableId))
}
}
}
} else {
e.log.Info("Create route for route table", "RouteTableID", *rt.RouteTableId, "return", *createRouteRes.Return)
e.log.WithValues("VpcPeering", cr.Name).Debug("Create Route successful", "RouteTableId", rt.RouteTableId, "result", createRouteRes.String())
}
}
if cr.Annotations == nil {
Expand All @@ -451,10 +337,13 @@ func (e *external) Update(ctx context.Context, mg cpresource.Managed) (managed.E
VPCRegion: route53.VPCRegion(*cr.Spec.ForProvider.PeerRegion),
},
}
_, err := e.route53Client.CreateVPCAssociationAuthorizationRequest(vpcAssociationAuthorizationInput).Send(ctx)
res, err := e.route53Client.CreateVPCAssociationAuthorizationRequest(vpcAssociationAuthorizationInput).Send(ctx)
if err != nil && !isAlreadyCreated(err) {
return managed.ExternalUpdate{}, errors.Wrap(err, errCreateHostzone)
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Create VPCAssociationAuthorization successful", "result", res.String())

if cr.Annotations == nil {
cr.Annotations = map[string]string{}
}
Expand Down Expand Up @@ -487,16 +376,17 @@ func (e *external) Delete(ctx context.Context, mg cpresource.Managed) error { //
}
cr.Status.SetConditions(xpv1.Deleting())

_, err := e.route53Client.DeleteVPCAssociationAuthorizationRequest(&route53.DeleteVPCAssociationAuthorizationInput{
res, err := e.route53Client.DeleteVPCAssociationAuthorizationRequest(&route53.DeleteVPCAssociationAuthorizationInput{
HostedZoneId: cr.Spec.ForProvider.HostZoneID,
VPC: &route53.VPC{
VPCId: cr.Spec.ForProvider.PeerVPCID,
VPCRegion: route53.VPCRegion(*cr.Spec.ForProvider.PeerRegion),
},
}).Send(ctx)
if err != nil {
e.log.Info("delete VPCAssociationAuthorization failed", "error", err)
return awsclient.Wrap(err, "delete VPCAssociationAuthorization")
}
e.log.WithValues("VpcPeering", cr.Name).Debug("Delete VPCAssociationAuthorization successful", "result", res.String())

filter := ec2.Filter{
Name: aws.String("vpc-id"),
Expand All @@ -505,32 +395,33 @@ func (e *external) Delete(ctx context.Context, mg cpresource.Managed) error { //
},
}
describeRouteTablesInput := &ec2.DescribeRouteTablesInput{
Filters: []ec2.Filter{filter},
MaxResults: aws.Int64(10),
Filters: []ec2.Filter{filter},
}
routeTablesRes, err := e.client.DescribeRouteTablesRequest(describeRouteTablesInput).Send(ctx)
if err != nil {
return err
return awsclient.Wrap(err, "describe RouteTables")
}

e.log.WithValues("VpcPeering", cr.Name).Debug("Describe RouteTables for deleting", "result", routeTablesRes.String())
for _, rt := range routeTablesRes.RouteTables {
for _, r := range rt.Routes {
if r.VpcPeeringConnectionId != nil && cr.Status.AtProvider.VPCPeeringConnectionID != nil && *r.VpcPeeringConnectionId == *cr.Status.AtProvider.VPCPeeringConnectionID {
_, err := e.client.DeleteRouteRequest(&ec2.DeleteRouteInput{
res, err := e.client.DeleteRouteRequest(&ec2.DeleteRouteInput{
DestinationCidrBlock: cr.Spec.ForProvider.PeerCIDR,

RouteTableId: rt.RouteTableId,
}).Send(ctx)
if err != nil {
return err
return awsclient.Wrap(err, "delete Route")
}
e.log.WithValues("VpcPeering", cr.Name).Debug("Delete route successful", "RouteTableId", rt.RouteTableId, "result", res.String())
}
}
}

err = e.deleteVPCPeeringConnection(ctx, cr)

return err
return awsclient.Wrap(err, "delete VPCPeeringConnection")
}

func isAWSErr(err error, code string, message string) bool {
Expand All @@ -549,13 +440,14 @@ func (e *external) deleteVPCPeeringConnection(ctx context.Context, cr *svcapityp

for _, peering := range resp.VpcPeeringConnections {
if peering.Status.Code == ec2.VpcPeeringConnectionStateReasonCodeInitiatingRequest || peering.Status.Code == ec2.VpcPeeringConnectionStateReasonCodePendingAcceptance || peering.Status.Code == ec2.VpcPeeringConnectionStateReasonCodeActive || peering.Status.Code == ec2.VpcPeeringConnectionStateReasonCodeExpired || peering.Status.Code == ec2.VpcPeeringConnectionStateReasonCodeRejected {
_, err := e.client.DeleteVpcPeeringConnectionRequest(&ec2.DeleteVpcPeeringConnectionInput{
res, err := e.client.DeleteVpcPeeringConnectionRequest(&ec2.DeleteVpcPeeringConnectionInput{
VpcPeeringConnectionId: peering.VpcPeeringConnectionId,
}).Send(ctx)

if err != nil && !isAWSErr(err, "InvalidVpcPeeringConnectionID.NotFound", "") {
return awsclient.Wrap(err, "errDelete")
return awsclient.Wrap(err, "delete vpc peering connection")
}
e.log.WithValues("VpcPeering", cr.Name).Debug("Delete VpcPeeringConnection successful", "result", res.String())
}
}

Expand Down

0 comments on commit b2757f4

Please sign in to comment.