Skip to content

Commit

Permalink
authorize: build evaluators cache in parallel (#4731)
Browse files Browse the repository at this point in the history
authorize: build evaluators cache in parallel (#4722)

* authorize: build evaluators cache in parallel

* session: add unit tests for gRPC wrapper methods (#4713)

* core/config: add support for maps in environments (#4717)

* reconciler: allow custom comparison function (#4726)

* add loopvar alias

---------

Co-authored-by: Denis Mishin <dmishin@pomerium.com>
Co-authored-by: Kenneth Jenkins <51246568+kenjenkins@users.noreply.github.com>
Co-authored-by: Caleb Doxsey <cdoxsey@pomerium.com>
  • Loading branch information
4 people committed Nov 9, 2023
1 parent 34187e8 commit 6cec77b
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 30 deletions.
46 changes: 38 additions & 8 deletions authorize/evaluator/evaluator.go
Expand Up @@ -7,13 +7,15 @@ import (
"fmt"
"net/http"
"net/url"
"time"

"github.com/go-jose/go-jose/v3"
"github.com/open-policy-agent/opa/rego"
"golang.org/x/sync/errgroup"

"github.com/pomerium/pomerium/authorize/internal/store"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/errgrouputil"
"github.com/pomerium/pomerium/internal/httputil"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry/trace"
Expand Down Expand Up @@ -139,12 +141,20 @@ func New(
return e, nil
}

type routeEvaluator struct {
id uint64
evaluator *PolicyEvaluator
}

func getOrCreatePolicyEvaluators(
ctx context.Context, cfg *evaluatorConfig, store *store.Store,
cachedPolicyEvaluators map[uint64]*PolicyEvaluator,
) (map[uint64]*PolicyEvaluator, error) {
var newCount, reusedCount int
now := time.Now()

var reusedCount int
m := make(map[uint64]*PolicyEvaluator)
var builders []errgrouputil.BuilderFunc[routeEvaluator]
for i := range cfg.Policies {
configPolicy := cfg.Policies[i]
id, err := configPolicy.RouteID()
Expand All @@ -157,15 +167,35 @@ func getOrCreatePolicyEvaluators(
reusedCount++
continue
}
policyEvaluator, err :=
NewPolicyEvaluator(ctx, store, &configPolicy, cfg.AddDefaultClientCertificateRule)
if err != nil {
return nil, err
builders = append(builders, func(ctx context.Context) (*routeEvaluator, error) {
evaluator, err := NewPolicyEvaluator(ctx, store, &configPolicy, cfg.AddDefaultClientCertificateRule)
if err != nil {
return nil, fmt.Errorf("authorize: error building evaluator for route id=%s: %w", configPolicy.ID, err)
}
return &routeEvaluator{
id: id,
evaluator: evaluator,
}, nil
})
}

evals, errs := errgrouputil.Build(ctx, builders...)
if len(errs) > 0 {
for _, err := range errs {
log.Error(ctx).Msg(err.Error())
}
m[id] = policyEvaluator
newCount++
return nil, fmt.Errorf("authorize: error building policy evaluators")
}
log.Info(ctx).Msgf("updated policy evaluators: %d created, %d reused", newCount, reusedCount)

for _, p := range evals {
m[p.id] = p.evaluator
}

log.Info(ctx).
Dur("duration", time.Since(now)).
Int("reused-policies", reusedCount).
Int("created-policies", len(cfg.Policies)-reusedCount).
Msg("updated policy evaluators")
return m, nil
}

Expand Down
45 changes: 23 additions & 22 deletions internal/databroker/config_source.go
Expand Up @@ -3,7 +3,6 @@ package databroker
import (
"context"
"fmt"
"runtime"
"sort"
"sync"
"time"
Expand All @@ -12,6 +11,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/errgrouputil"
"github.com/pomerium/pomerium/internal/hashutil"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry/metrics"
Expand Down Expand Up @@ -115,7 +115,6 @@ func (src *ConfigSource) rebuild(ctx context.Context, firstTime firstTime) {

func (src *ConfigSource) buildNewConfigLocked(ctx context.Context, cfg *config.Config) error {
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(runtime.NumCPU()/2 + 1)
eg.Go(func() error {
src.applySettingsLocked(ctx, cfg)
err := cfg.Options.Validate()
Expand All @@ -125,30 +124,32 @@ func (src *ConfigSource) buildNewConfigLocked(ctx context.Context, cfg *config.C
return nil
})

var policies []*config.Policy
var builders []func() error
buildPolicy := func(i int, routepb *configpb.Route) func() error {
return func() error {
policy, err := src.buildPolicyFromProto(routepb)
if err != nil {
log.Ctx(ctx).Err(err).Msg("databroker: error building policy from protobuf")
return nil
}
policies[i] = policy
return nil
}
}

var policyBuilders []errgrouputil.BuilderFunc[config.Policy]
for _, cfgpb := range src.dbConfigs {
for _, routepb := range cfgpb.GetRoutes() {
builders = append(builders, buildPolicy(len(builders), routepb))
routepb := routepb
policyBuilders = append(policyBuilders, func(ctx context.Context) (*config.Policy, error) {
p, err := src.buildPolicyFromProto(ctx, routepb)
if err != nil {
return nil, fmt.Errorf("error building route id=%s: %w", routepb.GetId(), err)
}
return p, nil
})
}
}

policies = make([]*config.Policy, len(builders))
for _, builder := range builders {
eg.Go(builder)
}
var policies []*config.Policy
eg.Go(func() error {
var errs []error
policies, errs = errgrouputil.Build(ctx, policyBuilders...)
if len(errs) > 0 {
for _, err := range errs {
log.Error(ctx).Msg(err.Error())
}
return fmt.Errorf("error building policies")
}
return nil
})

err := eg.Wait()
if err != nil {
Expand Down Expand Up @@ -177,7 +178,7 @@ func (src *ConfigSource) applySettingsLocked(ctx context.Context, cfg *config.Co
}
}

func (src *ConfigSource) buildPolicyFromProto(routepb *configpb.Route) (*config.Policy, error) {
func (src *ConfigSource) buildPolicyFromProto(_ context.Context, routepb *configpb.Route) (*config.Policy, error) {
policy, err := config.NewPolicyFromProto(routepb)
if err != nil {
return nil, fmt.Errorf("error building policy from protobuf: %w", err)
Expand Down
54 changes: 54 additions & 0 deletions internal/errgrouputil/builder.go
@@ -0,0 +1,54 @@
// Package errgrouputil contains methods for working with errgroup code.
package errgrouputil

import (
"context"
"runtime"

"golang.org/x/sync/errgroup"

"github.com/pomerium/pomerium/pkg/slices"
)

// BuilderFunc is a function that builds a value of type T
type BuilderFunc[T any] func(ctx context.Context) (*T, error)

// Build builds a slice of values of type T using the provided builders concurrently
// and returns the results and any errors.
func Build[T any](
ctx context.Context,
builders ...BuilderFunc[T],
) ([]*T, []error) {
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(runtime.NumCPU()/2 + 1)

results := make([]*T, len(builders))
errors := make([]error, len(builders))

fn := func(i int) func() error {
return func() error {
result, err := builders[i](ctx)
if err != nil {
errors[i] = err
return nil
}
results[i] = result
return nil
}
}

for i := range builders {
eg.Go(fn(i))
}

err := eg.Wait()
if err != nil {
return nil, []error{err} // not happening
}

return slices.Filter(results, func(t *T) bool {
return t != nil
}), slices.Filter(errors, func(err error) bool {
return err != nil
})
}

0 comments on commit 6cec77b

Please sign in to comment.