Skip to content

Commit

Permalink
eliminate global events manager (#3422)
Browse files Browse the repository at this point in the history
  • Loading branch information
wasaga committed Jun 14, 2022
1 parent 6b386f2 commit db42607
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 50 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ lint: ## Verifies `golint` passes.
.PHONY: test
test: get-envoy ## Runs the go tests.
@echo "==> $@"
@$(GO) test -tags "$(BUILDTAGS)" $(shell $(GO) list ./... | grep -v vendor | grep -v github.com/pomerium/pomerium/integration)
@$(GO) test -race -tags "$(BUILDTAGS)" $(shell $(GO) list ./... | grep -v vendor | grep -v github.com/pomerium/pomerium/integration)

.PHONY: cover
cover: get-envoy ## Runs go test with coverage
Expand Down
6 changes: 5 additions & 1 deletion databroker/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/directory"
"github.com/pomerium/pomerium/internal/envoy/files"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/internal/identity"
"github.com/pomerium/pomerium/internal/identity/manager"
"github.com/pomerium/pomerium/internal/log"
Expand All @@ -33,6 +34,7 @@ import (
type DataBroker struct {
dataBrokerServer *dataBrokerServer
manager *manager.Manager
eventsMgr *events.Manager

localListener net.Listener
localGRPCServer *grpc.Server
Expand All @@ -45,7 +47,7 @@ type DataBroker struct {
}

// New creates a new databroker service.
func New(cfg *config.Config) (*DataBroker, error) {
func New(cfg *config.Config, eventsMgr *events.Manager) (*DataBroker, error) {
localListener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, err
Expand Down Expand Up @@ -100,6 +102,7 @@ func New(cfg *config.Config) (*DataBroker, error) {
localGRPCConnection: localGRPCConnection,
deprecatedCacheClusterDomain: dataBrokerURLs[0].Hostname(),
dataBrokerStorageType: cfg.Options.DataBrokerStorageType,
eventsMgr: eventsMgr,
}
c.Register(c.localGRPCServer)

Expand Down Expand Up @@ -174,6 +177,7 @@ func (c *DataBroker) update(ctx context.Context, cfg *config.Config) error {
manager.WithDataBrokerClient(dataBrokerClient),
manager.WithGroupRefreshInterval(cfg.Options.RefreshDirectoryInterval),
manager.WithGroupRefreshTimeout(cfg.Options.RefreshDirectoryTimeout),
manager.WithEventManager(c.eventsMgr),
}

authenticator, err := identity.NewAuthenticator(oauthOptions)
Expand Down
3 changes: 2 additions & 1 deletion databroker/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/pkg/cryptutil"
)

Expand All @@ -28,7 +29,7 @@ func TestNew(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.opts.Provider = "google"
_, err := New(&config.Config{Options: &tt.opts})
_, err := New(&config.Config{Options: &tt.opts}, events.New())
if (err != nil) != tt.wantErr {
t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
15 changes: 11 additions & 4 deletions internal/cmd/pomerium/pomerium.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pomerium/pomerium/internal/databroker"
"github.com/pomerium/pomerium/internal/envoy"
"github.com/pomerium/pomerium/internal/envoy/files"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/registry"
"github.com/pomerium/pomerium/internal/version"
Expand Down Expand Up @@ -63,8 +64,10 @@ func Run(ctx context.Context, configFile string) error {
traceMgr := config.NewTraceManager(ctx, src)
defer traceMgr.Close()

eventsMgr := events.New()

// setup the control plane
controlPlane, err := controlplane.NewServer(src.GetConfig(), metricsMgr)
controlPlane, err := controlplane.NewServer(src.GetConfig(), metricsMgr, eventsMgr)
if err != nil {
return fmt.Errorf("error creating control plane: %w", err)
}
Expand Down Expand Up @@ -109,7 +112,7 @@ func Run(ctx context.Context, configFile string) error {
}
var dataBrokerServer *databroker_service.DataBroker
if config.IsDataBroker(src.GetConfig().Options.Services) {
dataBrokerServer, err = setupDataBroker(ctx, src, controlPlane)
dataBrokerServer, err = setupDataBroker(ctx, src, controlPlane, eventsMgr)
if err != nil {
return fmt.Errorf("setting up databroker: %w", err)
}
Expand Down Expand Up @@ -189,8 +192,12 @@ func setupAuthorize(ctx context.Context, src config.Source, controlPlane *contro
return svc, nil
}

func setupDataBroker(ctx context.Context, src config.Source, controlPlane *controlplane.Server) (*databroker_service.DataBroker, error) {
svc, err := databroker_service.New(src.GetConfig())
func setupDataBroker(ctx context.Context,
src config.Source,
controlPlane *controlplane.Server,
eventsMgr *events.Manager,
) (*databroker_service.DataBroker, error) {
svc, err := databroker_service.New(src.GetConfig(), eventsMgr)
if err != nil {
return nil, fmt.Errorf("error creating databroker service: %w", err)
}
Expand Down
10 changes: 6 additions & 4 deletions internal/controlplane/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Server struct {
DebugListener net.Listener
DebugRouter *mux.Router
Builder *envoyconfig.Builder
EventsMgr *events.Manager

currentConfig atomicVersionedConfig
name string
Expand All @@ -81,9 +82,10 @@ type Server struct {
}

// NewServer creates a new Server. Listener ports are chosen by the OS.
func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager) (*Server, error) {
func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager, eventsMgr *events.Manager) (*Server, error) {
srv := &Server{
metricsMgr: metricsMgr,
EventsMgr: eventsMgr,
reproxy: reproxy.New(),
haveSetCapacity: map[string]bool{},
}
Expand Down Expand Up @@ -172,7 +174,7 @@ func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager) (*Server,
return nil, err
}

srv.xdsmgr = xdsmgr.NewManager(res)
srv.xdsmgr = xdsmgr.NewManager(res, eventsMgr)
envoy_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv.GRPCServer, srv.xdsmgr)

return srv, nil
Expand All @@ -182,12 +184,12 @@ func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager) (*Server,
func (srv *Server) Run(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)

handle := events.Register(func(evt events.Event) {
handle := srv.EventsMgr.Register(func(evt events.Event) {
withGRPCBackoff(ctx, func() error {
return srv.storeEvent(ctx, evt)
})
})
defer events.Unregister(handle)
defer srv.EventsMgr.Unregister(handle)

// start the gRPC server
eg.Go(func() error {
Expand Down
12 changes: 8 additions & 4 deletions internal/controlplane/xdsmgr/xdsmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ type Manager struct {
nonceToConfig *lru.Cache

hostname string

events *events.Manager
}

// NewManager creates a new Manager.
func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource) *Manager {
func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource, evt *events.Manager) *Manager {
nonceToConfig, _ := lru.New(maxNonceCacheSize) // the only error they return is when size is negative, which never happens

return &Manager{
Expand All @@ -59,6 +61,8 @@ func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource) *Ma
resources: resources,

hostname: getHostname(),

events: evt,
}
}

Expand Down Expand Up @@ -268,7 +272,7 @@ func (mgr *Manager) nonceToConfigVersion(nonce string) (ver uint64) {
}

func (mgr *Manager) nackEvent(ctx context.Context, req *envoy_service_discovery_v3.DeltaDiscoveryRequest) {
events.Dispatch(&events.EnvoyConfigurationEvent{
mgr.events.Dispatch(&events.EnvoyConfigurationEvent{
Instance: mgr.hostname,
Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_NACK,
Time: timestamppb.Now(),
Expand All @@ -294,7 +298,7 @@ func (mgr *Manager) nackEvent(ctx context.Context, req *envoy_service_discovery_
}

func (mgr *Manager) ackEvent(ctx context.Context, req *envoy_service_discovery_v3.DeltaDiscoveryRequest) {
events.Dispatch(&events.EnvoyConfigurationEvent{
mgr.events.Dispatch(&events.EnvoyConfigurationEvent{
Instance: mgr.hostname,
Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_ACK,
Time: timestamppb.Now(),
Expand All @@ -315,7 +319,7 @@ func (mgr *Manager) ackEvent(ctx context.Context, req *envoy_service_discovery_v
}

func (mgr *Manager) changeEvent(ctx context.Context, res *envoy_service_discovery_v3.DeltaDiscoveryResponse) {
events.Dispatch(&events.EnvoyConfigurationEvent{
mgr.events.Dispatch(&events.EnvoyConfigurationEvent{
Instance: mgr.hostname,
Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_RESPONSE,
Time: timestamppb.Now(),
Expand Down
3 changes: 2 additions & 1 deletion internal/controlplane/xdsmgr/xdsmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/test/bufconn"

"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/internal/signal"
)

Expand All @@ -36,7 +37,7 @@ func TestManager(t *testing.T) {
typeURL: {
{Name: "r1", Version: "1"},
},
})
}, events.New())
envoy_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv, mgr)

li := bufconn.Listen(bufSize)
Expand Down
15 changes: 0 additions & 15 deletions internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,6 @@ type EventSink func(Event)
// An EventSinkHandle is a reference to a registered EventSink so that it can be unregistered.
type EventSinkHandle string

// Dispatch dispatches an event to any event sinks.
func Dispatch(evt Event) {
defaultManager.Dispatch(evt)
}

// Register registers a new sink to receive events.
func Register(sink EventSink) EventSinkHandle {
return defaultManager.Register(sink)
}

// Unregister unregisters a sink so it will no longer receive events.
func Unregister(sinkHandle EventSinkHandle) {
defaultManager.Unregister(sinkHandle)
}

type (
// EnvoyConfigurationEvent re-exports events.EnvoyConfigurationEvent.
EnvoyConfigurationEvent = events.EnvoyConfigurationEvent
Expand Down
20 changes: 14 additions & 6 deletions internal/events/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"github.com/pomerium/pomerium/internal/log"
)

var defaultManager = New()

// A Manager manages the dispatching of events to event sinks.
type Manager struct {
mu sync.RWMutex
Expand All @@ -27,21 +25,31 @@ func New() *Manager {
// Dispatch dispatches an event to any registered event sinks.
func (mgr *Manager) Dispatch(evt Event) {
mgr.mu.RLock()
dropped := mgr.dispatchLocked(evt)
mgr.mu.RUnlock()

if dropped {
log.Warn(context.Background()).
Interface("event", evt).
Msg("controlplane: dropping event due to full channel")
}
}

func (mgr *Manager) dispatchLocked(evt Event) bool {
sinks := make([]chan Event, 0, len(mgr.sinks))
for _, sink := range mgr.sinks {
sinks = append(sinks, sink)
}
mgr.mu.RUnlock()

dropped := false
for _, sink := range sinks {
select {
case sink <- evt:
default:
log.Warn(context.Background()).
Interface("event", evt).
Msg("controlplane: dropping event due to full channel")
dropped = true
}
}
return dropped
}

// Register registers an event sink to receive events.
Expand Down
9 changes: 9 additions & 0 deletions internal/identity/manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/pomerium/pomerium/internal/directory"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
)

Expand All @@ -24,6 +25,7 @@ type config struct {
sessionRefreshGracePeriod time.Duration
sessionRefreshCoolOffDuration time.Duration
now func() time.Time
eventMgr *events.Manager
}

func newConfig(options ...Option) *config {
Expand Down Expand Up @@ -98,6 +100,13 @@ func WithNow(now func() time.Time) Option {
}
}

// WithEventManager passes an event manager to record events
func WithEventManager(mgr *events.Manager) Option {
return func(c *config) {
c.eventMgr = mgr
}
}

type atomicConfig struct {
value atomic.Value
}
Expand Down
21 changes: 21 additions & 0 deletions internal/identity/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/pomerium/pomerium/internal/directory"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/internal/identity/identity"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/scheduler"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/pomerium/pomerium/pkg/grpc/session"
"github.com/pomerium/pomerium/pkg/grpc/user"
"github.com/pomerium/pomerium/pkg/grpcutil"
metrics_ids "github.com/pomerium/pomerium/pkg/metrics"
"github.com/pomerium/pomerium/pkg/protoutil"
)

Expand Down Expand Up @@ -208,6 +210,7 @@ func (mgr *Manager) refreshDirectoryUserGroups(ctx context.Context) (nextRefresh

directoryGroups, directoryUsers, err := mgr.cfg.Load().directory.UserGroups(ctx)
metrics.RecordIdentityManagerUserGroupRefresh(ctx, err)
mgr.recordLastError(metrics_ids.IdentityManagerLastUserGroupRefreshError, err)
if err != nil {
msg := "failed to refresh directory users and groups"
if ctx.Err() != nil {
Expand Down Expand Up @@ -356,6 +359,7 @@ func (mgr *Manager) refreshSession(ctx context.Context, userID, sessionID string

newToken, err := mgr.cfg.Load().authenticator.Refresh(ctx, FromOAuthToken(s.OauthToken), &s)
metrics.RecordIdentityManagerSessionRefresh(ctx, err)
mgr.recordLastError(metrics_ids.IdentityManagerLastSessionRefreshError, err)
if isTemporaryError(err) {
log.Error(ctx).Err(err).
Str("user_id", s.GetUserId()).
Expand All @@ -374,6 +378,7 @@ func (mgr *Manager) refreshSession(ctx context.Context, userID, sessionID string

err = mgr.cfg.Load().authenticator.UpdateUserInfo(ctx, FromOAuthToken(s.OauthToken), &s)
metrics.RecordIdentityManagerUserRefresh(ctx, err)
mgr.recordLastError(metrics_ids.IdentityManagerLastUserRefreshError, err)
if isTemporaryError(err) {
log.Error(ctx).Err(err).
Str("user_id", s.GetUserId()).
Expand Down Expand Up @@ -426,6 +431,7 @@ func (mgr *Manager) refreshUser(ctx context.Context, userID string) {

err := mgr.cfg.Load().authenticator.UpdateUserInfo(ctx, FromOAuthToken(s.OauthToken), &u)
metrics.RecordIdentityManagerUserRefresh(ctx, err)
mgr.recordLastError(metrics_ids.IdentityManagerLastUserRefreshError, err)
if isTemporaryError(err) {
log.Error(ctx).Err(err).
Str("user_id", s.GetUserId()).
Expand Down Expand Up @@ -552,6 +558,21 @@ func (mgr *Manager) reset() {
mgr.users = userCollection{BTree: btree.New(8)}
}

func (mgr *Manager) recordLastError(id string, err error) {
if err == nil {
return
}
evtMgr := mgr.cfg.Load().eventMgr
if evtMgr == nil {
return
}
evtMgr.Dispatch(&events.LastError{
Time: timestamppb.Now(),
Message: err.Error(),
Id: id,
})
}

func isTemporaryError(err error) bool {
if err == nil {
return false
Expand Down

0 comments on commit db42607

Please sign in to comment.