Skip to content

Commit

Permalink
core/identity: add enabler (#5084)
Browse files Browse the repository at this point in the history
* core/identity: add disabler

* enable by default

* add name

* rename to enabler, use mutex instead of goroutine

* rename method, add comments
  • Loading branch information
calebdoxsey committed Apr 26, 2024
1 parent a518435 commit 99a5dbd
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 4 deletions.
108 changes: 108 additions & 0 deletions internal/enabler/enabler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// package enabler contains a component that can be enabled and disabled dynamically
package enabler

import (
"context"
"errors"
"sync"

"github.com/pomerium/pomerium/internal/log"
)

var errCauseEnabler = errors.New("enabler")

// A Handler is a component with a RunEnabled function.
type Handler interface {
RunEnabled(ctx context.Context) error
}

// HandlerFunc is a function run by the enabler.
type HandlerFunc func(ctx context.Context) error

func (f HandlerFunc) RunEnabled(ctx context.Context) error {
return f(ctx)
}

// An Enabler enables or disables a component dynamically.
// When the Enabler is enabled, the Handler's RunEnabled will be called.
// If the Enabler is subsequently disabled the context passed to RunEnabled will be canceled.
// If the Enabler is subseqently enabled again, RunEnabled will be called again.
// Handlers should obey the context lifetime and be tolerant of RunEnabled
// being called multiple times. (not concurrently)
type Enabler interface {
Run(ctx context.Context) error
Enable()
Disable()
}

type enabler struct {
name string
handler Handler

mu sync.Mutex
cancel context.CancelCauseFunc
enabled bool
}

// New creates a new Enabler.
func New(name string, handler Handler, enabled bool) Enabler {
d := &enabler{
name: name,
handler: handler,
enabled: enabled,
cancel: func(_ error) {},
}
return d
}

// Run calls RunEnabled if enabled, otherwise it waits until enabled.
func (d *enabler) Run(ctx context.Context) error {
for {
err := d.runOrWaitForEnabled(ctx)
// if we received any error but our own, exit with that error
if !errors.Is(err, errCauseEnabler) {
return err
}
}
}

func (d *enabler) runOrWaitForEnabled(ctx context.Context) error {
d.mu.Lock()
enabled := d.enabled
ctx, d.cancel = context.WithCancelCause(ctx)
d.mu.Unlock()

// we're enabled so call RunEnabled. If Disabled is called it will cancel ctx.
if enabled {
log.Ctx(ctx).Info().Msgf("enabled %s", d.name)
err := d.handler.RunEnabled(ctx)
// if RunEnabled stopped because we canceled the context
if errors.Is(err, context.Canceled) && errors.Is(context.Cause(ctx), errCauseEnabler) {
log.Ctx(ctx).Info().Msgf("disabled %s", d.name)
return errCauseEnabler
}
return err
}

// wait until Enabled is called
<-ctx.Done()
return context.Cause(ctx)
}

func (d *enabler) Enable() {
d.mu.Lock()
if !d.enabled {
d.enabled = true
d.cancel(errCauseEnabler)
}
d.mu.Unlock()
}

func (d *enabler) Disable() {
d.mu.Lock()
if d.enabled {
d.enabled = false
d.cancel(errCauseEnabler)
}
d.mu.Unlock()
}
61 changes: 61 additions & 0 deletions internal/enabler/enabler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package enabler_test

import (
"context"
"errors"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/pomerium/pomerium/internal/enabler"
)

func TestEnabler(t *testing.T) {
t.Parallel()

t.Run("enabled immediately", func(t *testing.T) {
t.Parallel()

e := enabler.New("test", enabler.HandlerFunc(func(ctx context.Context) error {
return errors.New("ERROR")
}), true)
err := e.Run(context.Background())
assert.Error(t, err)
})
t.Run("enabled delayed", func(t *testing.T) {
t.Parallel()

e := enabler.New("test", enabler.HandlerFunc(func(ctx context.Context) error {
return errors.New("ERROR")
}), false)
time.AfterFunc(time.Millisecond*10, e.Enable)
err := e.Run(context.Background())
assert.Error(t, err)
})
t.Run("disabled", func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

var started, stopped atomic.Int64
e := enabler.New("test", enabler.HandlerFunc(func(ctx context.Context) error {
started.Add(1)
<-ctx.Done()
stopped.Add(1)
return ctx.Err()
}), true)
time.AfterFunc(time.Millisecond*10, e.Disable)
go e.Run(ctx)

assert.Eventually(t, func() bool { return stopped.Load() == 1 }, time.Second, time.Millisecond*100,
"should stop RunEnabled")

e.Enable()

assert.Eventually(t, func() bool { return started.Load() == 2 }, time.Second, time.Millisecond*100,
"should re-start RunEnabled")
})
}
13 changes: 11 additions & 2 deletions internal/identity/manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ type config struct {
sessionRefreshCoolOffDuration time.Duration
now func() time.Time
eventMgr *events.Manager
enabled bool
}

func newConfig(options ...Option) *config {
cfg := new(config)
WithSessionRefreshGracePeriod(defaultSessionRefreshGracePeriod)(cfg)
WithSessionRefreshCoolOffDuration(defaultSessionRefreshCoolOffDuration)(cfg)
WithNow(time.Now)(cfg)
WithEnabled(true)(cfg)
for _, option := range options {
option(cfg)
}
Expand Down Expand Up @@ -72,7 +74,14 @@ func WithNow(now func() time.Time) Option {

// WithEventManager passes an event manager to record events
func WithEventManager(mgr *events.Manager) Option {
return func(c *config) {
c.eventMgr = mgr
return func(cfg *config) {
cfg.eventMgr = mgr
}
}

// WithEnabled sets the enabled option in the config.
func WithEnabled(enabled bool) Option {
return func(cfg *config) {
cfg.enabled = enabled
}
}
12 changes: 10 additions & 2 deletions internal/identity/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/pomerium/pomerium/internal/atomicutil"
"github.com/pomerium/pomerium/internal/enabler"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/internal/identity/identity"
"github.com/pomerium/pomerium/internal/log"
Expand Down Expand Up @@ -43,6 +44,7 @@ type (

// A Manager refreshes identity information using session and user data.
type Manager struct {
enabler.Enabler
cfg *atomicutil.Value[*config]

sessionScheduler *scheduler.Scheduler
Expand All @@ -62,6 +64,7 @@ func New(
sessionScheduler: scheduler.New(),
userScheduler: scheduler.New(),
}
mgr.Enabler = enabler.New("identity_manager", mgr, true)
mgr.reset()
mgr.UpdateConfig(options...)
return mgr
Expand All @@ -76,10 +79,15 @@ func withLog(ctx context.Context) context.Context {
// UpdateConfig updates the manager with the new options.
func (mgr *Manager) UpdateConfig(options ...Option) {
mgr.cfg.Store(newConfig(options...))
if mgr.cfg.Load().enabled {
mgr.Enable()
} else {
mgr.Disable()
}
}

// Run runs the manager. This method blocks until an error occurs or the given context is canceled.
func (mgr *Manager) Run(ctx context.Context) error {
// RunEnabled runs the manager. This method blocks until an error occurs or the given context is canceled.
func (mgr *Manager) RunEnabled(ctx context.Context) error {
leaser := databroker.NewLeaser("identity_manager", time.Second*30, mgr)
return leaser.Run(ctx)
}
Expand Down

0 comments on commit 99a5dbd

Please sign in to comment.