Skip to content

Commit

Permalink
identity: batch directory updates (#3411)
Browse files Browse the repository at this point in the history
* identity: batch directory updates

* add batch details to log message
  • Loading branch information
calebdoxsey committed Jun 8, 2022
1 parent 493148b commit a7bd284
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 87 deletions.
125 changes: 38 additions & 87 deletions internal/identity/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ package manager
import (
"context"
"errors"
"fmt"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/google/btree"
"github.com/rs/zerolog"
"golang.org/x/oauth2"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

Expand All @@ -28,10 +26,6 @@ import (
"github.com/pomerium/pomerium/pkg/protoutil"
)

const (
dataBrokerParallelism = 10
)

// Authenticator is an identity.Provider with only the methods needed by the manager.
type Authenticator interface {
Refresh(context.Context, *oauth2.Token, identity.State) (*oauth2.Token, error)
Expand Down Expand Up @@ -59,8 +53,6 @@ type Manager struct {

directoryBackoff *backoff.ExponentialBackOff
directoryNextRefresh time.Time

dataBrokerSemaphore *semaphore.Weighted
}

// New creates a new identity manager.
Expand All @@ -72,8 +64,6 @@ func New(

sessionScheduler: scheduler.New(),
userScheduler: scheduler.New(),

dataBrokerSemaphore: semaphore.NewWeighted(dataBrokerParallelism),
}
mgr.directoryBackoff = backoff.NewExponentialBackOff()
mgr.directoryBackoff.MaxElapsedTime = 0
Expand Down Expand Up @@ -240,35 +230,22 @@ func (mgr *Manager) refreshDirectoryUserGroups(ctx context.Context) (nextRefresh
}

func (mgr *Manager) mergeGroups(ctx context.Context, directoryGroups []*directory.Group) {
eg, ctx := errgroup.WithContext(ctx)

lookup := map[string]*directory.Group{}
for _, dg := range directoryGroups {
lookup[dg.GetId()] = dg
}

var records []*databroker.Record

for groupID, newDG := range lookup {
curDG, ok := mgr.directoryGroups[groupID]
if !ok || !proto.Equal(newDG, curDG) {
id := newDG.GetId()
any := protoutil.NewAny(newDG)
eg.Go(func() error {
if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil {
return err
}
defer mgr.dataBrokerSemaphore.Release(1)

_, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, &databroker.PutRequest{
Records: []*databroker.Record{{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
}},
})
if err != nil {
return fmt.Errorf("failed to update directory group: %s", id)
}
return nil
records = append(records, &databroker.Record{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
})
}
}
Expand All @@ -278,62 +255,43 @@ func (mgr *Manager) mergeGroups(ctx context.Context, directoryGroups []*director
if !ok {
id := curDG.GetId()
any := protoutil.NewAny(curDG)
eg.Go(func() error {
if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil {
return err
}
defer mgr.dataBrokerSemaphore.Release(1)

_, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, &databroker.PutRequest{
Records: []*databroker.Record{{
Type: any.GetTypeUrl(),
Id: id,
DeletedAt: timestamppb.Now(),
}},
})
if err != nil {
return fmt.Errorf("failed to delete directory group: %s", id)
}
return nil
records = append(records, &databroker.Record{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
DeletedAt: timestamppb.New(mgr.cfg.Load().now()),
})
}
}

if err := eg.Wait(); err != nil {
log.Warn(ctx).Err(err).Msg("manager: failed to merge groups")
for i, batch := range databroker.OptimumPutRequestsFromRecords(records) {
_, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, batch)
if err != nil {
log.Warn(ctx).Err(err).
Int("batch", i).
Int("record-count", len(batch.GetRecords())).
Msg("manager: failed to update groups")
}
}
}

func (mgr *Manager) mergeUsers(ctx context.Context, directoryUsers []*directory.User) {
eg, ctx := errgroup.WithContext(ctx)

lookup := map[string]*directory.User{}
for _, du := range directoryUsers {
lookup[du.GetId()] = du
}

var records []*databroker.Record

for userID, newDU := range lookup {
curDU, ok := mgr.directoryUsers[userID]
if !ok || !proto.Equal(newDU, curDU) {
id := newDU.GetId()
any := protoutil.NewAny(newDU)
eg.Go(func() error {
if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil {
return err
}
defer mgr.dataBrokerSemaphore.Release(1)

client := mgr.cfg.Load().dataBrokerClient
if _, err := client.Put(ctx, &databroker.PutRequest{
Records: []*databroker.Record{{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
}},
}); err != nil {
return fmt.Errorf("failed to update directory user: %s", id)
}
return nil
records = append(records, &databroker.Record{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
})
}
}
Expand All @@ -343,30 +301,23 @@ func (mgr *Manager) mergeUsers(ctx context.Context, directoryUsers []*directory.
if !ok {
id := curDU.GetId()
any := protoutil.NewAny(curDU)
eg.Go(func() error {
if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil {
return err
}
defer mgr.dataBrokerSemaphore.Release(1)

client := mgr.cfg.Load().dataBrokerClient
if _, err := client.Put(ctx, &databroker.PutRequest{
Records: []*databroker.Record{{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
DeletedAt: timestamppb.Now(),
}},
}); err != nil {
return fmt.Errorf("failed to delete directory user (%s): %w", id, err)
}
return nil
records = append(records, &databroker.Record{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
DeletedAt: timestamppb.New(mgr.cfg.Load().now()),
})
}
}

if err := eg.Wait(); err != nil {
log.Warn(ctx).Err(err).Msg("manager: failed to merge users")
for i, batch := range databroker.OptimumPutRequestsFromRecords(records) {
_, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, batch)
if err != nil {
log.Warn(ctx).Err(err).
Int("batch", i).
Int("record-count", len(batch.GetRecords())).
Msg("manager: failed to update users")
}
}
}

Expand Down
10 changes: 10 additions & 0 deletions internal/identity/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"

"github.com/pomerium/pomerium/internal/directory"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/grpc/databroker/mock_databroker"
"github.com/pomerium/pomerium/pkg/grpc/session"
"github.com/pomerium/pomerium/pkg/grpc/user"
"github.com/pomerium/pomerium/pkg/protoutil"
Expand All @@ -30,12 +32,15 @@ func (mock mockProvider) UserGroups(ctx context.Context) ([]*directory.Group, []
}

func TestManager_onUpdateRecords(t *testing.T) {
ctrl := gomock.NewController(t)

ctx, clearTimeout := context.WithTimeout(context.Background(), time.Second*10)
defer clearTimeout()

now := time.Now()

mgr := New(
WithDataBrokerClient(mock_databroker.NewMockDataBrokerServiceClient(ctrl)),
WithDirectoryProvider(mockProvider{}),
WithGroupRefreshInterval(time.Hour),
WithNow(func() time.Time {
Expand Down Expand Up @@ -67,12 +72,17 @@ func TestManager_onUpdateRecords(t *testing.T) {
}

func TestManager_refreshDirectoryUserGroups(t *testing.T) {
ctrl := gomock.NewController(t)

ctx, clearTimeout := context.WithTimeout(context.Background(), time.Second*10)
defer clearTimeout()

t.Run("backoff", func(t *testing.T) {
cnt := 0
client := mock_databroker.NewMockDataBrokerServiceClient(ctrl)
client.EXPECT().Put(gomock.Any(), gomock.Any()).AnyTimes()
mgr := New(
WithDataBrokerClient(client),
WithDirectoryProvider(mockProvider{
userGroups: func(ctx context.Context) ([]*directory.Group, []*directory.User, error) {
cnt++
Expand Down

0 comments on commit a7bd284

Please sign in to comment.