-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SDK extensibility: add hooks (plugins, discovery, sdk) #6053
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
// Copyright 2023 The OPA Authors. All rights reserved. | ||
// Use of this source code is governed by an Apache2 | ||
// license that can be found in the LICENSE file. | ||
|
||
package hooks | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/open-policy-agent/opa/config" | ||
) | ||
|
||
// Hook is a hook to be called in some select places in OPA's operation. | ||
// | ||
// The base Hook interface is any, and wherever a hook can occur, the calling code | ||
// will check if your hook implements an appropriate interface. If so, your hook | ||
// is called. | ||
// | ||
// This allows you to only hook in to behavior you care about, and it allows the | ||
// OPA to add more hooks in the future. | ||
// | ||
// All hook interfaces in this package have Hook in the name. Hooks must be safe | ||
// for concurrent use. It is expected that hooks are fast; if a hook needs to take | ||
// time, then copy what you need and ensure the hook is async. | ||
// | ||
// When multiple instances of a hook are provided, they are all going to be executed | ||
// in an unspecified order (it's a map-range call underneath). If you need hooks to | ||
// be run in order, you can wrap them into another hook, and configure that one. | ||
type Hook any | ||
|
||
// Hooks is the type used for every struct in OPA that can work with hooks. | ||
type Hooks struct { | ||
m map[Hook]struct{} // we are NOT providing a stable invocation ordering | ||
} | ||
|
||
// New creates a new instance of Hooks. | ||
func New(hs ...Hook) Hooks { | ||
h := Hooks{m: make(map[Hook]struct{}, len(hs))} | ||
for i := range hs { | ||
h.m[hs[i]] = struct{}{} | ||
} | ||
return h | ||
} | ||
|
||
func (hs Hooks) Each(fn func(Hook)) { | ||
for h := range hs.m { | ||
fn(h) | ||
} | ||
} | ||
|
||
// ConfigHook allows inspecting or rewriting the configuration when the plugin | ||
// manager is processing it. | ||
// Note that this hook is not run when the plugin manager is reconfigured. This | ||
// usually only happens when there's a new config from a discovery bundle, and | ||
// for processing _that_, there's `ConfigDiscoveryHook`. | ||
type ConfigHook interface { | ||
OnConfig(context.Context, *config.Config) (*config.Config, error) | ||
} | ||
|
||
// ConfigHook allows inspecting or rewriting the discovered configuration when | ||
// the discovery plugin is processing it. | ||
type ConfigDiscoveryHook interface { | ||
OnConfigDiscovery(context.Context, *config.Config) (*config.Config, error) | ||
} | ||
|
||
func (hs Hooks) Validate() error { | ||
for h := range hs.m { | ||
switch h.(type) { | ||
case ConfigHook, | ||
ConfigDiscoveryHook: // OK | ||
default: | ||
return fmt.Errorf("unknown hook type %T", h) | ||
} | ||
} | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
// Copyright 2022 The Go Authors. All rights reserved. | ||
// Use of this source code is governed by a BSD-style | ||
// license that can be found in the LICENSE file. | ||
|
||
//go:build !go1.20 | ||
|
||
package errors | ||
|
||
// Join returns an error that wraps the given errors. | ||
// Any nil error values are discarded. | ||
// Join returns nil if errs contains no non-nil values. | ||
// The error formats as the concatenation of the strings obtained | ||
// by calling the Error method of each element of errs, with a newline | ||
// between each string. | ||
func Join(errs ...error) error { | ||
n := 0 | ||
for _, err := range errs { | ||
if err != nil { | ||
n++ | ||
} | ||
} | ||
if n == 0 { | ||
return nil | ||
} | ||
e := &joinError{ | ||
errs: make([]error, 0, n), | ||
} | ||
for _, err := range errs { | ||
if err != nil { | ||
e.errs = append(e.errs, err) | ||
} | ||
} | ||
return e | ||
} | ||
|
||
type joinError struct { | ||
errs []error | ||
} | ||
|
||
func (e *joinError) Error() string { | ||
var b []byte | ||
for i, err := range e.errs { | ||
if i > 0 { | ||
b = append(b, '\n') | ||
} | ||
b = append(b, err.Error()...) | ||
} | ||
return string(b) | ||
} | ||
|
||
func (e *joinError) Unwrap() []error { | ||
return e.errs | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
//go:build go1.20 | ||
|
||
package errors | ||
|
||
import "errors" | ||
|
||
var Join = errors.Join |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,10 @@ import ( | |
"github.com/open-policy-agent/opa/ast" | ||
"github.com/open-policy-agent/opa/bundle" | ||
"github.com/open-policy-agent/opa/config" | ||
"github.com/open-policy-agent/opa/hooks" | ||
bundleUtils "github.com/open-policy-agent/opa/internal/bundle" | ||
cfg "github.com/open-policy-agent/opa/internal/config" | ||
"github.com/open-policy-agent/opa/internal/errors" | ||
initload "github.com/open-policy-agent/opa/internal/runtime/init" | ||
"github.com/open-policy-agent/opa/keys" | ||
"github.com/open-policy-agent/opa/loader" | ||
|
@@ -196,6 +198,7 @@ type Manager struct { | |
distributedTacingOpts tracing.Options | ||
registeredNDCacheTriggers []func(bool) | ||
bootstrapConfigLabels map[string]string | ||
hooks hooks.Hooks | ||
} | ||
|
||
type managerContextKey string | ||
|
@@ -374,6 +377,13 @@ func WithDistributedTracingOpts(tr tracing.Options) func(*Manager) { | |
} | ||
} | ||
|
||
// WithHooks allows passing hooks to the plugin manager. | ||
func WithHooks(hs hooks.Hooks) func(*Manager) { | ||
return func(m *Manager) { | ||
m.hooks = hs | ||
} | ||
} | ||
|
||
// New creates a new Manager using config. | ||
func New(raw []byte, id string, store storage.Store, opts ...func(*Manager)) (*Manager, error) { | ||
|
||
|
@@ -382,27 +392,15 @@ func New(raw []byte, id string, store storage.Store, opts ...func(*Manager)) (*M | |
return nil, err | ||
} | ||
|
||
keys, err := keys.ParseKeysConfig(parsedConfig.Keys) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
interQueryBuiltinCacheConfig, err := cache.ParseCachingConfig(parsedConfig.Caching) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
m := &Manager{ | ||
Store: store, | ||
Config: parsedConfig, | ||
ID: id, | ||
keys: keys, | ||
pluginStatus: map[string]*Status{}, | ||
pluginStatusListeners: map[string]StatusListener{}, | ||
maxErrors: -1, | ||
interQueryBuiltinCacheConfig: interQueryBuiltinCacheConfig, | ||
serverInitialized: make(chan struct{}), | ||
bootstrapConfigLabels: parsedConfig.Labels, | ||
Store: store, | ||
Config: parsedConfig, | ||
ID: id, | ||
pluginStatus: map[string]*Status{}, | ||
pluginStatusListeners: map[string]StatusListener{}, | ||
maxErrors: -1, | ||
serverInitialized: make(chan struct{}), | ||
bootstrapConfigLabels: parsedConfig.Labels, | ||
Comment on lines
395
to
+403
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [comment]: This change makes sense in light of what happens a few lines further down in this function; we're delaying populating some of the Manager's fields until after all the overrides have happened. |
||
} | ||
|
||
for _, f := range opts { | ||
|
@@ -417,21 +415,43 @@ func New(raw []byte, id string, store storage.Store, opts ...func(*Manager)) (*M | |
m.consoleLogger = logging.New() | ||
} | ||
|
||
m.hooks.Each(func(h hooks.Hook) { | ||
if f, ok := h.(hooks.ConfigHook); ok { | ||
if c, e := f.OnConfig(context.Background(), parsedConfig); e != nil { | ||
err = errors.Join(err, e) | ||
} else { | ||
parsedConfig = c | ||
} | ||
} | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// do after options and overrides | ||
m.keys, err = keys.ParseKeysConfig(parsedConfig.Keys) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
m.interQueryBuiltinCacheConfig, err = cache.ParseCachingConfig(parsedConfig.Caching) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
serviceOpts := cfg.ServiceOptions{ | ||
Raw: parsedConfig.Services, | ||
AuthPlugin: m.AuthPlugin, | ||
Keys: keys, | ||
Keys: m.keys, | ||
Logger: m.logger, | ||
DistributedTacingOpts: m.distributedTacingOpts, | ||
} | ||
|
||
services, err := cfg.ParseServicesConfig(serviceOpts) | ||
m.services, err = cfg.ParseServicesConfig(serviceOpts) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
m.services = services | ||
|
||
return m, nil | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ import ( | |
|
||
"github.com/open-policy-agent/opa/ast" | ||
"github.com/open-policy-agent/opa/bundle" | ||
"github.com/open-policy-agent/opa/hooks" | ||
"github.com/open-policy-agent/opa/internal/ref" | ||
"github.com/open-policy-agent/opa/internal/runtime" | ||
"github.com/open-policy-agent/opa/internal/uuid" | ||
|
@@ -44,6 +45,7 @@ type OPA struct { | |
console logging.Logger | ||
plugins map[string]plugins.Factory | ||
store storage.Store | ||
hooks hooks.Hooks | ||
config []byte | ||
} | ||
|
||
|
@@ -74,6 +76,7 @@ func New(ctx context.Context, opts Options) (*OPA, error) { | |
opa := &OPA{ | ||
id: id, | ||
store: opts.Store, | ||
hooks: opts.Hooks, | ||
state: &state{ | ||
queryCache: newQueryCache(), | ||
}, | ||
|
@@ -133,7 +136,9 @@ func (opa *OPA) configure(ctx context.Context, bs []byte, ready chan struct{}, b | |
plugins.Logger(opa.logger), | ||
plugins.ConsoleLogger(opa.console), | ||
plugins.EnablePrintStatements(opa.logger.GetLevel() >= logging.Info), | ||
plugins.PrintHook(loggingPrintHook{logger: opa.logger})) | ||
plugins.PrintHook(loggingPrintHook{logger: opa.logger}), | ||
plugins.WithHooks(opa.hooks), | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -167,7 +172,10 @@ func (opa *OPA) configure(ctx context.Context, bs []byte, ready chan struct{}, b | |
close(ready) | ||
}) | ||
|
||
d, err := discovery.New(manager, discovery.Factories(opa.plugins)) | ||
d, err := discovery.New(manager, | ||
discovery.Factories(opa.plugins), | ||
discovery.Hooks(opa.hooks), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [comment]: "Where the rubber hits the road" for the PR: the hooks are finally wired in next to the places they're used. This is pleasantly clean to look at. |
||
) | ||
if err != nil { | ||
return err | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This hook is expected to run only when a new manager is created not on reconfigure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Maybe it should. I'll look into it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a note to the comment.