Skip to content

Commit

Permalink
Purger: compute throughput values from number of instances (#7502)
Browse files Browse the repository at this point in the history
Give akamai-purger a new "Throughput.TotalInstances" config value, to
inform it how many instances of itself are competing for akamai rate
limit quote. Combine the `useOptimizedDefaults` and `validate` functions
into a single `optimizeAndValidate` function which sets default values
according to the number of active instances, and confirms that the
results still fall within the rate limits.

Fixes #7487
  • Loading branch information
aarongable committed May 24, 2024
1 parent 5be3650 commit 0d8efb9
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 66 deletions.
70 changes: 46 additions & 24 deletions cmd/akamai-purger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,33 +68,43 @@ type Throughput struct {
// purge request. One cached OCSP response is composed of 3 URLs totaling <
// 400 bytes. If this value isn't provided it will default to
// 'defaultQueueEntriesPerBatch'.
QueueEntriesPerBatch int
//
// Deprecated: Only set TotalInstances and let it compute the defaults.
QueueEntriesPerBatch int `validate:"min=0"`

// PurgeBatchInterval is the duration waited between dispatching an Akamai
// purge request containing 'QueueEntriesPerBatch' * 3 URLs. If this value
// isn't provided it will default to 'defaultPurgeBatchInterval'.
//
// Deprecated: Only set TotalInstances and let it compute the defaults.
PurgeBatchInterval config.Duration `validate:"-"`
}

func (t *Throughput) useOptimizedDefaults() {
if t.QueueEntriesPerBatch == 0 {
t.QueueEntriesPerBatch = defaultEntriesPerBatch
}
if t.PurgeBatchInterval.Duration == 0 {
t.PurgeBatchInterval.Duration = defaultPurgeBatchInterval
}
// TotalInstances is the number of akamai-purger instances running at the same
// time, across all data centers.
TotalInstances int `validate:"min=0"`
}

// validate ensures that the provided throughput configuration will not violate
// the Akamai Fast-Purge API limits. For more information see the official
// documentation:
// https://techdocs.akamai.com/purge-cache/reference/rate-limiting
func (t *Throughput) validate() error {
if t.PurgeBatchInterval.Duration == 0 {
return errors.New("'purgeBatchInterval' must be > 0")
// optimizeAndValidate updates a Throughput struct in-place, replacing any unset
// fields with sane defaults and ensuring that the resulting configuration will
// not cause us to exceed Akamai's rate limits.
func (t *Throughput) optimizeAndValidate() error {
// Ideally, this is the only variable actually configured, and we derive
// everything else from here. But if it isn't set, assume only 1 is running.
if t.TotalInstances < 0 {
return errors.New("'totalInstances' must be positive or 0 (for the default)")
} else if t.TotalInstances == 0 {
t.TotalInstances = 1
}
if t.QueueEntriesPerBatch <= 0 {
return errors.New("'queueEntriesPerBatch' must be > 0")

// For the sake of finding a valid throughput solution, we hold the number of
// queue entries sent per purge batch constant. We set 2 entries (6 urls) as
// the default, and historically we have never had a reason to configure a
// different amount. This default ensures we stay well below the maximum
// request size of 50,000 bytes per request.
if t.QueueEntriesPerBatch < 0 {
return errors.New("'queueEntriesPerBatch' must be positive or 0 (for the default)")
} else if t.QueueEntriesPerBatch == 0 {
t.QueueEntriesPerBatch = defaultEntriesPerBatch
}

// Send no more than the 50,000 bytes of objects we’re allotted per request.
Expand All @@ -104,8 +114,21 @@ func (t *Throughput) validate() error {
akamaiBytesPerReqLimit, bytesPerRequest-akamaiBytesPerReqLimit)
}

// Now the purge interval must be set such that we exceed neither the 50 API
// requests per second limit nor the 200 URLs per second limit across all
// concurrent purger instances. We calculated that a value of one request
// every 32ms satisfies both constraints with a bit of breathing room (as long
// as the number of entries per batch is also at its default). By default we
// set this purger's interval to a multiple of 32ms, depending on how many
// other purger instances are running.
if t.PurgeBatchInterval.Duration < 0 {
return errors.New("'purgeBatchInterval' must be positive or 0 (for the default)")
} else if t.PurgeBatchInterval.Duration == 0 {
t.PurgeBatchInterval.Duration = defaultPurgeBatchInterval * time.Duration(t.TotalInstances)
}

// Send no more than the 50 API requests we’re allotted each second.
requestsPerSecond := int(math.Ceil(float64(time.Second) / float64(t.PurgeBatchInterval.Duration)))
requestsPerSecond := int(math.Ceil(float64(time.Second)/float64(t.PurgeBatchInterval.Duration))) * t.TotalInstances
if requestsPerSecond > akamaiAPIReqPerSecondLimit {
return fmt.Errorf("config exceeds Akamai's requests per second limit (%d requests) by %d",
akamaiAPIReqPerSecondLimit, requestsPerSecond-akamaiAPIReqPerSecondLimit)
Expand All @@ -117,6 +140,7 @@ func (t *Throughput) validate() error {
return fmt.Errorf("config exceeds Akamai's URLs per second limit (%d URLs) by %d",
akamaiURLsPerSecondLimit, urlsPurgedPerSecond-akamaiURLsPerSecondLimit)
}

return nil
}

Expand Down Expand Up @@ -304,11 +328,9 @@ func main() {
defer oTelShutdown(context.Background())
logger.Info(cmd.VersionString())

// Unless otherwise specified, use optimized throughput settings.
if (apc.Throughput == Throughput{}) {
apc.Throughput.useOptimizedDefaults()
}
cmd.FailOnError(apc.Throughput.validate(), "")
// Use optimized throughput settings for any that are left unspecified.
err = apc.Throughput.optimizeAndValidate()
cmd.FailOnError(err, "Failed to find valid throughput solution")

if apc.MaxQueueSize == 0 {
apc.MaxQueueSize = defaultQueueSize
Expand Down
123 changes: 83 additions & 40 deletions cmd/akamai-purger/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

akamaipb "github.com/letsencrypt/boulder/akamai/proto"
"github.com/letsencrypt/boulder/config"
blog "github.com/letsencrypt/boulder/log"
"github.com/letsencrypt/boulder/test"
)
Expand All @@ -16,55 +17,97 @@ func TestImplementation(t *testing.T) {
test.AssertImplementsGRPCServer(t, &akamaiPurger{}, akamaipb.UnimplementedAkamaiPurgerServer{})
}

func TestThroughput_validate(t *testing.T) {
type fields struct {
QueueEntriesPerBatch int
PurgeBatchInterval time.Duration
}
func TestThroughput_optimizeAndValidate(t *testing.T) {
dur := func(in time.Duration) config.Duration { return config.Duration{Duration: in} }

tests := []struct {
name string
fields fields
wantErr bool
input Throughput
want Throughput
wantErr string
}{
{"optimized defaults, should succeed",
fields{
QueueEntriesPerBatch: defaultEntriesPerBatch,
PurgeBatchInterval: defaultPurgeBatchInterval},
false,
{
"negative instances",
Throughput{defaultEntriesPerBatch, dur(defaultPurgeBatchInterval), -1},
Throughput{},
"must be positive",
},
{
"negative batch interval",
Throughput{defaultEntriesPerBatch, config.Duration{Duration: -1}, -1},
Throughput{},
"must be positive",
},
{
"negative entries per batch",
Throughput{-1, dur(defaultPurgeBatchInterval), 1},
Throughput{},
"must be positive",
},
{
"empty input computes sane defaults",
Throughput{},
Throughput{defaultEntriesPerBatch, dur(defaultPurgeBatchInterval), 1},
"",
},
{"2ms faster than optimized defaults, should succeed",
fields{
QueueEntriesPerBatch: defaultEntriesPerBatch,
PurgeBatchInterval: defaultPurgeBatchInterval + 2*time.Millisecond},
false,
{
"strict configuration is honored",
Throughput{2, dur(1 * time.Second), 1},
Throughput{2, dur(1 * time.Second), 1},
"",
},
{"exceeds URLs per second by 4 URLs",
fields{
QueueEntriesPerBatch: defaultEntriesPerBatch,
PurgeBatchInterval: 29 * time.Millisecond},
true,
{
"slightly looser configuration still within limits",
Throughput{defaultEntriesPerBatch, dur(defaultPurgeBatchInterval - time.Millisecond), 1},
Throughput{defaultEntriesPerBatch, dur(defaultPurgeBatchInterval - time.Millisecond), 1},
"",
},
{"exceeds bytes per second by 20 bytes",
fields{
QueueEntriesPerBatch: 125,
PurgeBatchInterval: 1 * time.Second},
true,
{
"too many requests per second",
Throughput{QueueEntriesPerBatch: 1, PurgeBatchInterval: dur(19999 * time.Microsecond)},
Throughput{},
"requests per second limit",
},
{"exceeds requests per second by 1 request",
fields{
QueueEntriesPerBatch: 1,
PurgeBatchInterval: 19999 * time.Microsecond},
true,
{
"too many URLs per second",
Throughput{PurgeBatchInterval: dur(29 * time.Millisecond)},
Throughput{},
"URLs per second limit",
},
{
"too many bytes per request",
Throughput{QueueEntriesPerBatch: 125, PurgeBatchInterval: dur(1 * time.Second)},
Throughput{},
"bytes per request limit",
},
{
"two instances computes sane defaults",
Throughput{TotalInstances: 2},
Throughput{defaultEntriesPerBatch, dur(defaultPurgeBatchInterval * 2), 2},
"",
},
{
"too many requests per second across multiple instances",
Throughput{PurgeBatchInterval: dur(defaultPurgeBatchInterval), TotalInstances: 2},
Throughput{},
"requests per second limit",
},
{
"too many entries per second across multiple instances",
Throughput{PurgeBatchInterval: dur(59 * time.Millisecond), TotalInstances: 2},
Throughput{},
"URLs per second limit",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tr := &Throughput{
QueueEntriesPerBatch: tt.fields.QueueEntriesPerBatch,
}
tr.PurgeBatchInterval.Duration = tt.fields.PurgeBatchInterval
if err := tr.validate(); (err != nil) != tt.wantErr {
t.Errorf("Throughput.validate() error = %v, wantErr %v", err, tt.wantErr)
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := tc.input.optimizeAndValidate()
if tc.wantErr != "" {
test.AssertError(t, err, "")
test.AssertContains(t, err.Error(), tc.wantErr)
} else {
test.AssertNotError(t, err, "")
test.AssertEquals(t, tc.input, tc.want)
}
})
}
Expand Down
3 changes: 1 addition & 2 deletions test/config-next/akamai-purger.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
"purgeRetries": 10,
"purgeRetryBackoff": "50ms",
"throughput": {
"queueEntriesPerBatch": 2,
"purgeBatchInterval": "32ms"
"totalInstances": 1
},
"baseURL": "http://localhost:6789",
"clientToken": "its-a-token",
Expand Down

0 comments on commit 0d8efb9

Please sign in to comment.