Skip to content

Commit

Permalink
Route53: retry single changes in a batch if the batch fails
Browse files Browse the repository at this point in the history
If a single change fails during the retry, it will be added to a queue.
In the next iteration, changes from this queue will be submitted after
all other changes.

When submitting single changes, they are always submitted as batches of
changes with the same DNS name and ownership relation to avoid
inconsistency between the record created and the TXT records.
  • Loading branch information
alfredkrohmer committed Jan 16, 2023
1 parent adf6ad7 commit 7dd84a5
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 85 deletions.
2 changes: 2 additions & 0 deletions endpoint/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
OwnerLabelKey = "owner"
// ResourceLabelKey is the name of the label that identifies k8s resource which wants to acquire the DNS name
ResourceLabelKey = "resource"
// OwnedRecordLabelKey is the name of the label that identifies the record that is owned by the labeled TXT registry record
OwnedRecordLabelKey = "ownedRecord"

// AWSSDDescriptionLabel label responsible for storing raw owner/resource combination information in the Labels
// supposed to be inserted by AWS SD Provider, and parsed into OwnerLabelKey and ResourceLabelKey key by AWS SD Registry
Expand Down
1 change: 1 addition & 0 deletions internal/testutils/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func SameEndpoint(a, b *endpoint.Endpoint) bool {
return a.DNSName == b.DNSName && a.Targets.Same(b.Targets) && a.RecordType == b.RecordType && a.SetIdentifier == b.SetIdentifier &&
a.Labels[endpoint.OwnerLabelKey] == b.Labels[endpoint.OwnerLabelKey] && a.RecordTTL == b.RecordTTL &&
a.Labels[endpoint.ResourceLabelKey] == b.Labels[endpoint.ResourceLabelKey] &&
a.Labels[endpoint.OwnedRecordLabelKey] == b.Labels[endpoint.OwnedRecordLabelKey] &&
SameProviderSpecific(a.ProviderSpecific, b.ProviderSpecific)
}

Expand Down
131 changes: 102 additions & 29 deletions provider/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ type AWSProvider struct {
zoneTagFilter provider.ZoneTagFilter
preferCNAME bool
zonesCache *zonesListCache
// queue for collecting changes to submit them in the next iteration, but after all other changes
failedChangesQueue map[string]Route53Changes
}

// AWSConfig contains configuration to create a new AWS provider.
Expand Down Expand Up @@ -240,6 +242,7 @@ func NewAWSProvider(awsConfig AWSConfig) (*AWSProvider, error) {
preferCNAME: awsConfig.PreferCNAME,
dryRun: awsConfig.DryRun,
zonesCache: &zonesListCache{duration: awsConfig.ZoneCacheDuration},
failedChangesQueue: make(map[string]Route53Changes),
}

return provider, nil
Expand Down Expand Up @@ -556,9 +559,16 @@ func (p *AWSProvider) submitChanges(ctx context.Context, changes Route53Changes,
for z, cs := range changesByZone {
var failedUpdate bool

batchCs := batchChangeSet(cs, p.batchChangeSize)
// group changes into new changes and into changes that failed in a previous iteration and are retried
retriedChanges, newChanges := findChangesInQueue(cs, p.failedChangesQueue[z])
p.failedChangesQueue[z] = nil

batchCs := append(batchChangeSet(newChanges, p.batchChangeSize), batchChangeSet(retriedChanges, p.batchChangeSize)...)
for i, b := range batchCs {
if len(b) == 0 {
continue
}

for _, c := range b {
log.Infof("Desired change: %s %s %s [Id: %s]", *c.Action, *c.ResourceRecordSet.Name, *c.ResourceRecordSet.Type, z)
}
Expand All @@ -571,13 +581,41 @@ func (p *AWSProvider) submitChanges(ctx context.Context, changes Route53Changes,
},
}

successfulChanges := 0

if _, err := p.client.ChangeResourceRecordSetsWithContext(ctx, params); err != nil {
log.Errorf("Failure in zone %s [Id: %s]", aws.StringValue(zones[z].Name), z)
log.Error(err) // TODO(ideahitme): consider changing the interface in cases when this error might be a concern for other components
failedUpdate = true
log.Errorf("Failure in zone %s [Id: %s] when submitting change batch: %v", aws.StringValue(zones[z].Name), z, err)

changesByOwnership := groupChangesByNameAndOwnershipRelation(b)

if len(changesByOwnership) > 1 {
log.Debug("Trying to submit change sets one-by-one instead")

for _, changes := range changesByOwnership {
for _, c := range changes {
log.Debugf("Desired change: %s %s %s [Id: %s]", *c.Action, *c.ResourceRecordSet.Name, *c.ResourceRecordSet.Type, z)
}
params.ChangeBatch = &route53.ChangeBatch{
Changes: changes.Route53Changes(),
}
if _, err := p.client.ChangeResourceRecordSetsWithContext(ctx, params); err != nil {
failedUpdate = true
log.Errorf("Failed submitting change (error: %v), it will be retried in a separate change batch in the next iteration", err)
p.failedChangesQueue[z] = append(p.failedChangesQueue[z], changes...)
} else {
successfulChanges = successfulChanges + len(changes)
}
}
} else {
failedUpdate = true
}
} else {
successfulChanges = len(b)
}

if successfulChanges > 0 {
// z is the R53 Hosted Zone ID already as aws.StringValue
log.Infof("%d record(s) in zone %s [Id: %s] were successfully updated", len(b), aws.StringValue(zones[z].Name), z)
log.Infof("%d record(s) in zone %s [Id: %s] were successfully updated", successfulChanges, aws.StringValue(zones[z].Name), z)
}

if i != len(batchCs)-1 {
Expand Down Expand Up @@ -736,9 +774,51 @@ func (p *AWSProvider) newChange(action string, ep *endpoint.Endpoint) (*Route53C
change.ResourceRecordSet.HealthCheckId = aws.String(prop.Value)
}

if ownedRecord, ok := ep.Labels[endpoint.OwnedRecordLabelKey]; ok {
change.OwnedRecord = ownedRecord
}

return change, dualstack
}

// searches for `changes` that are contained in `queue` and returns the `changes` separated by whether they were found in the queue (`foundChanges`) or not (`notFoundChanges`)
func findChangesInQueue(changes Route53Changes, queue Route53Changes) (foundChanges, notFoundChanges Route53Changes) {
if queue == nil {
return Route53Changes{}, changes
}

for _, c := range changes {
found := false
for _, qc := range queue {
if c == qc {
foundChanges = append(foundChanges, c)
found = true
break
}
}
if !found {
notFoundChanges = append(notFoundChanges, c)
}
}

return
}

// group the given changes by name and ownership relation to ensure these are always submitted in the same transaction to Route53;
// grouping by name is done to always submit changes with the same name but different set identifier together,
// grouping by ownership relation is done to always submit changes of records and e.g. their corresponding TXT registry records together
func groupChangesByNameAndOwnershipRelation(cs Route53Changes) map[string]Route53Changes {
changesByOwnership := make(map[string]Route53Changes)
for _, v := range cs {
key := v.OwnedRecord
if key == "" {
key = aws.StringValue(v.ResourceRecordSet.Name)
}
changesByOwnership[key] = append(changesByOwnership[key], v)
}
return changesByOwnership
}

func (p *AWSProvider) tagsForZone(ctx context.Context, zoneID string) (map[string]string, error) {
response, err := p.client.ListTagsForResourceWithContext(ctx, &route53.ListTagsForResourceInput{
ResourceType: aws.String("hostedzone"),
Expand All @@ -762,41 +842,34 @@ func batchChangeSet(cs Route53Changes, batchSize int) []Route53Changes {

batchChanges := make([]Route53Changes, 0)

changesByName := make(map[string]Route53Changes)
for _, v := range cs {
changesByName[*v.ResourceRecordSet.Name] = append(changesByName[*v.ResourceRecordSet.Name], v)
}
changesByOwnership := groupChangesByNameAndOwnershipRelation(cs)

names := make([]string, 0)
for v := range changesByName {
for v := range changesByOwnership {
names = append(names, v)
}
sort.Strings(names)

for _, name := range names {
totalChangesByName := len(changesByName[name])

if totalChangesByName > batchSize {
log.Warnf("Total changes for %s exceeds max batch size of %d, total changes: %d", name,
batchSize, totalChangesByName)
currentBatch := Route53Changes{}
for k, name := range names {
v := changesByOwnership[name]
if len(v) > batchSize {
log.Warnf("Total changes for %v exceeds max batch size of %d, total changes: %d; changes will not be performed", k, batchSize, len(v))
continue
}

var existingBatch bool
for i, b := range batchChanges {
if len(b)+totalChangesByName <= batchSize {
batchChanges[i] = append(batchChanges[i], changesByName[name]...)
existingBatch = true
break
}
}
if !existingBatch {
batchChanges = append(batchChanges, changesByName[name])
if len(currentBatch)+len(v) > batchSize {
// currentBatch would be too large if we add this changeset;
// add currentBatch to batchChanges and start a new currentBatch
batchChanges = append(batchChanges, sortChangesByActionNameType(currentBatch))
currentBatch = append(Route53Changes{}, v...)
} else {
currentBatch = append(currentBatch, v...)
}
}

for i, batch := range batchChanges {
batchChanges[i] = sortChangesByActionNameType(batch)
if len(currentBatch) > 0 {
// add final currentBatch
batchChanges = append(batchChanges, sortChangesByActionNameType(currentBatch))
}

return batchChanges
Expand Down
69 changes: 69 additions & 0 deletions provider/aws/aws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,65 @@ func TestAWSsubmitChangesError(t *testing.T) {
require.Error(t, provider.submitChanges(ctx, cs, zones))
}

func TestAWSsubmitChangesRetryOnError(t *testing.T) {
provider, clientStub := newAWSProvider(t, endpoint.NewDomainFilter([]string{"ext-dns-test-2.teapot.zalan.do."}), provider.NewZoneIDFilter([]string{}), provider.NewZoneTypeFilter(""), defaultEvaluateTargetHealth, false, []*endpoint.Endpoint{})

ctx := context.Background()
zones, err := provider.Zones(ctx)
require.NoError(t, err)

ep1 := endpoint.NewEndpointWithTTL("success.zone-1.ext-dns-test-2.teapot.zalan.do", endpoint.RecordTypeA, endpoint.TTL(recordTTL), "1.0.0.1")
ep2 := endpoint.NewEndpointWithTTL("fail.zone-1.ext-dns-test-2.teapot.zalan.do", endpoint.RecordTypeA, endpoint.TTL(recordTTL), "1.0.0.2")
ep3 := endpoint.NewEndpointWithTTL("success2.zone-1.ext-dns-test-2.teapot.zalan.do", endpoint.RecordTypeA, endpoint.TTL(recordTTL), "1.0.0.3")

ep2txt := endpoint.NewEndpointWithTTL("fail__edns_housekeeping.zone-1.ext-dns-test-2.teapot.zalan.do", endpoint.RecordTypeTXT, endpoint.TTL(recordTTL), "something") // "__edns_housekeeping" is the TXT suffix
ep2txt.Labels = map[string]string{
endpoint.OwnedRecordLabelKey: "fail.zone-1.ext-dns-test-2.teapot.zalan.do",
}

// "success" and "fail" are created in the first step, both are submitted in the same batch; this should fail
cs1 := provider.newChanges(route53.ChangeActionCreate, []*endpoint.Endpoint{ep2, ep2txt, ep1})
input1 := &route53.ChangeResourceRecordSetsInput{
HostedZoneId: aws.String("/hostedzone/zone-1.ext-dns-test-2.teapot.zalan.do."),
ChangeBatch: &route53.ChangeBatch{
Changes: cs1.Route53Changes(),
},
}
clientStub.MockMethod("ChangeResourceRecordSets", input1).Return(nil, fmt.Errorf("Mock route53 failure"))

// because of the failure, changes will be retried one by one; make "fail" submitted in its own batch fail as well
cs2 := provider.newChanges(route53.ChangeActionCreate, []*endpoint.Endpoint{ep2, ep2txt})
input2 := &route53.ChangeResourceRecordSetsInput{
HostedZoneId: aws.String("/hostedzone/zone-1.ext-dns-test-2.teapot.zalan.do."),
ChangeBatch: &route53.ChangeBatch{
Changes: cs2.Route53Changes(),
},
}
clientStub.MockMethod("ChangeResourceRecordSets", input2).Return(nil, fmt.Errorf("Mock route53 failure"))

// "success" should have been created, verify that we still get an error because "fail" failed
require.Error(t, provider.submitChanges(ctx, cs1, zones))

// assert that "success" was successfully created and "fail" and its TXT record were not
records, err := provider.Records(ctx)
require.NoError(t, err)
require.True(t, containsRecordWithDNSName(records, "success.zone-1.ext-dns-test-2.teapot.zalan.do"))
require.False(t, containsRecordWithDNSName(records, "fail.zone-1.ext-dns-test-2.teapot.zalan.do"))
require.False(t, containsRecordWithDNSName(records, "fail__edns_housekeeping.zone-1.ext-dns-test-2.teapot.zalan.do"))

// next batch should contain "fail" and "success2", should succeed this time
cs3 := provider.newChanges(route53.ChangeActionCreate, []*endpoint.Endpoint{ep2, ep2txt, ep3})
require.NoError(t, provider.submitChanges(ctx, cs3, zones))

// verify all records are there
records, err = provider.Records(ctx)
require.NoError(t, err)
require.True(t, containsRecordWithDNSName(records, "success.zone-1.ext-dns-test-2.teapot.zalan.do"))
require.True(t, containsRecordWithDNSName(records, "fail.zone-1.ext-dns-test-2.teapot.zalan.do"))
require.True(t, containsRecordWithDNSName(records, "success2.zone-1.ext-dns-test-2.teapot.zalan.do"))
require.True(t, containsRecordWithDNSName(records, "fail__edns_housekeeping.zone-1.ext-dns-test-2.teapot.zalan.do"))
}

func TestAWSBatchChangeSet(t *testing.T) {
var cs Route53Changes

Expand Down Expand Up @@ -1375,6 +1434,7 @@ func newAWSProviderWithTagFilter(t *testing.T, domainFilter endpoint.DomainFilte
zoneTagFilter: zoneTagFilter,
dryRun: false,
zonesCache: &zonesListCache{duration: 1 * time.Minute},
failedChangesQueue: make(map[string]Route53Changes),
}

createAWSZone(t, provider, &route53.HostedZone{
Expand Down Expand Up @@ -1449,6 +1509,15 @@ func validateRecords(t *testing.T, records []*route53.ResourceRecordSet, expecte
assert.ElementsMatch(t, expected, records)
}

func containsRecordWithDNSName(records []*endpoint.Endpoint, dnsName string) bool {
for _, record := range records {
if record.DNSName == dnsName {
return true
}
}
return false
}

func TestRequiresDeleteCreate(t *testing.T) {
provider, _ := newAWSProvider(t, endpoint.NewDomainFilter([]string{"foo.bar."}), provider.NewZoneIDFilter([]string{}), provider.NewZoneTypeFilter(""), defaultEvaluateTargetHealth, false, []*endpoint.Endpoint{})

Expand Down
2 changes: 2 additions & 0 deletions registry/txt.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (im *TXTRegistry) generateTXTRecord(r *endpoint.Endpoint) []*endpoint.Endpo
txt := endpoint.NewEndpoint(im.mapper.toTXTName(r.DNSName), endpoint.RecordTypeTXT, r.Labels.Serialize(true))
if txt != nil {
txt.WithSetIdentifier(r.SetIdentifier)
txt.Labels[endpoint.OwnedRecordLabelKey] = r.DNSName
txt.ProviderSpecific = r.ProviderSpecific
endpoints = append(endpoints, txt)
}
Expand All @@ -206,6 +207,7 @@ func (im *TXTRegistry) generateTXTRecord(r *endpoint.Endpoint) []*endpoint.Endpo
txtNew := endpoint.NewEndpoint(im.mapper.toNewTXTName(r.DNSName, r.RecordType), endpoint.RecordTypeTXT, r.Labels.Serialize(true))
if txtNew != nil {
txtNew.WithSetIdentifier(r.SetIdentifier)
txtNew.Labels[endpoint.OwnedRecordLabelKey] = r.DNSName
txtNew.ProviderSpecific = r.ProviderSpecific
endpoints = append(endpoints, txtNew)
}
Expand Down

0 comments on commit 7dd84a5

Please sign in to comment.