Skip to content

Commit

Permalink
zero: group funcs that need run within a lease (#4862)
Browse files Browse the repository at this point in the history
  • Loading branch information
wasaga committed Dec 21, 2023
1 parent faa2a86 commit 07d6087
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 60 deletions.
20 changes: 16 additions & 4 deletions internal/zero/controller/controller.go
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pomerium/pomerium/internal/zero/analytics"
sdk "github.com/pomerium/pomerium/internal/zero/api"
"github.com/pomerium/pomerium/internal/zero/bootstrap"
"github.com/pomerium/pomerium/internal/zero/leaser"
"github.com/pomerium/pomerium/internal/zero/reconciler"
"github.com/pomerium/pomerium/internal/zero/reporter"
"github.com/pomerium/pomerium/pkg/cmd/pomerium"
Expand Down Expand Up @@ -42,12 +43,10 @@ func Run(ctx context.Context, opts ...Option) error {
}

eg.Go(func() error { return run(ctx, "connect", c.runConnect, nil) })
eg.Go(func() error { return run(ctx, "connect-log", c.RunConnectLog, nil) })
eg.Go(func() error { return run(ctx, "zero-bootstrap", c.runBootstrap, nil) })
eg.Go(func() error { return run(ctx, "pomerium-core", c.runPomeriumCore, src.WaitReady) })
eg.Go(func() error { return run(ctx, "zero-reconciler", c.runReconciler, src.WaitReady) })
eg.Go(func() error { return run(ctx, "connect-log", c.RunConnectLog, nil) })
eg.Go(func() error { return run(ctx, "zero-analytics", c.runAnalytics, src.WaitReady) })
eg.Go(func() error { return run(ctx, "zero-reporter", c.runReporter, src.WaitReady) })
eg.Go(func() error { return c.runZeroControlLoop(ctx, src.WaitReady) })
return eg.Wait()
}

Expand Down Expand Up @@ -113,6 +112,19 @@ func (c *controller) runConnect(ctx context.Context) error {
return c.api.Connect(ctx)
}

func (c *controller) runZeroControlLoop(ctx context.Context, waitFn func(context.Context) error) error {
err := waitFn(ctx)
if err != nil {
return fmt.Errorf("error waiting for initial configuration: %w", err)
}

return leaser.Run(ctx, c.databrokerClient,
c.runReconciler,
c.runAnalytics,
c.runReporter,
)
}

func (c *controller) runReconciler(ctx context.Context) error {
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("service", "zero-reconciler")
Expand Down
49 changes: 49 additions & 0 deletions internal/zero/leaser/leaser.go
@@ -0,0 +1,49 @@
// Package leaser groups all Zero services that should run within a lease.
package leaser

import (
"context"
"time"

"golang.org/x/sync/errgroup"

"github.com/pomerium/pomerium/pkg/grpc/databroker"
)

type service struct {
client databroker.DataBrokerServiceClient
funcs []func(ctx context.Context) error
}

// GetDataBrokerServiceClient implements the databroker.LeaseHandler interface.
func (c *service) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient {
return c.client
}

// RunLeased implements the databroker.LeaseHandler interface.
func (c *service) RunLeased(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)
for _, fn := range c.funcs {
fn := fn
eg.Go(func() error {
return fn(ctx)
})
}
return eg.Wait()
}

// Run runs services within a lease
func Run(
ctx context.Context,
client databroker.DataBrokerServiceClient,
funcs ...func(ctx context.Context) error,
) error {
srv := &service{
client: client,
funcs: funcs,
}
leaser := databroker.NewLeaser("zero-ctrl", time.Second*30, srv)
return RunWithRestart(ctx, func(ctx context.Context) error {
return leaser.Run(ctx)
}, srv.databrokerChangeMonitor)
}
37 changes: 37 additions & 0 deletions internal/zero/leaser/monitor.go
@@ -0,0 +1,37 @@
package leaser

import (
"context"
"fmt"

"github.com/pomerium/pomerium/pkg/grpc/databroker"
)

const typeStr = "pomerium.io/zero/leaser"

// databrokerChangeMonitor runs infinite sync loop to see if there is any change in databroker
// it doesn't really syncs anything, just checks if the underlying databroker has changed
func (c *service) databrokerChangeMonitor(ctx context.Context) error {
_, recordVersion, serverVersion, err := databroker.InitialSync(ctx, c.GetDataBrokerServiceClient(), &databroker.SyncLatestRequest{
Type: typeStr,
})
if err != nil {
return fmt.Errorf("error during initial sync: %w", err)
}

stream, err := c.GetDataBrokerServiceClient().Sync(ctx, &databroker.SyncRequest{
Type: typeStr,
ServerVersion: serverVersion,
RecordVersion: recordVersion,
})
if err != nil {
return fmt.Errorf("error calling sync: %w", err)
}

for {
_, err := stream.Recv()
if err != nil {
return fmt.Errorf("error receiving record: %w", err)
}
}
}
@@ -1,10 +1,15 @@
package reconciler
package leaser

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/cenkalti/backoff/v4"

"github.com/pomerium/pomerium/internal/log"
)

// RunWithRestart executes execFn.
Expand Down Expand Up @@ -44,8 +49,15 @@ func restartContexts(
contexts chan<- context.Context,
restartFn func(context.Context) error,
) {
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = 0 // never stop

ticker := time.NewTicker(bo.InitialInterval)
defer ticker.Stop()

defer close(contexts)
for base.Err() == nil {
start := time.Now()
ctx, cancel := context.WithCancelCause(base)
select {
case contexts <- ctx:
Expand All @@ -55,6 +67,20 @@ func restartContexts(
cancel(fmt.Errorf("parent context canceled: %w", base.Err()))
return
}

if time.Since(start) > bo.MaxInterval {
bo.Reset()
}
next := bo.NextBackOff()
ticker.Reset(next)

log.Ctx(ctx).Info().Msgf("restarting zero control loop in %s", next.String())

select {
case <-base.Done():
return
case <-ticker.C:
}
}
}

Expand Down
@@ -1,4 +1,4 @@
package reconciler_test
package leaser_test

import (
"context"
Expand All @@ -8,7 +8,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/pomerium/pomerium/internal/zero/reconciler"
"github.com/pomerium/pomerium/internal/zero/leaser"
)

func TestRestart(t *testing.T) {
Expand All @@ -20,7 +20,7 @@ func TestRestart(t *testing.T) {

errExpected := errors.New("execFn error")
count := 0
err := reconciler.RunWithRestart(context.Background(),
err := leaser.RunWithRestart(context.Background(),
func(context.Context) error {
count++
if count == 1 {
Expand All @@ -40,7 +40,7 @@ func TestRestart(t *testing.T) {
t.Parallel()

count := 0
err := reconciler.RunWithRestart(context.Background(),
err := leaser.RunWithRestart(context.Background(),
func(context.Context) error {
count++
if count == 1 {
Expand All @@ -63,7 +63,7 @@ func TestRestart(t *testing.T) {
t.Cleanup(cancel)

ready := make(chan struct{})
err := reconciler.RunWithRestart(ctx,
err := leaser.RunWithRestart(ctx,
func(context.Context) error {
<-ready
cancel()
Expand All @@ -87,7 +87,7 @@ func TestRestart(t *testing.T) {
errExpected := errors.New("execFn error")
count := 0
ready := make(chan struct{})
err := reconciler.RunWithRestart(ctx,
err := leaser.RunWithRestart(ctx,
func(ctx context.Context) error {
count++
if count == 1 { // wait for us to be restarted
Expand Down
49 changes: 0 additions & 49 deletions internal/zero/reconciler/service.go
Expand Up @@ -7,15 +7,13 @@ package reconciler

import (
"context"
"fmt"
"time"

"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"

"github.com/pomerium/pomerium/internal/atomicutil"
connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
)

type service struct {
Expand All @@ -42,60 +40,13 @@ func Run(ctx context.Context, opts ...Option) error {
}
c.periodicUpdateInterval.Store(config.checkForUpdateIntervalWhenDisconnected)

return c.runMainLoop(ctx)
}

// RunLeased implements the databroker.LeaseHandler interface
func (c *service) RunLeased(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error { return c.watchUpdates(ctx) })
eg.Go(func() error { return c.SyncLoop(ctx) })

return eg.Wait()
}

// GetDataBrokerServiceClient implements the databroker.LeaseHandler interface.
func (c *service) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient {
return c.config.databrokerClient
}

func (c *service) runMainLoop(ctx context.Context) error {
leaser := databroker.NewLeaser("zero-reconciler", time.Second*30, c)
return RunWithRestart(ctx, func(ctx context.Context) error {
return leaser.Run(ctx)
}, c.databrokerChangeMonitor)
}

// databrokerChangeMonitor runs infinite sync loop to see if there is any change in databroker
func (c *service) databrokerChangeMonitor(ctx context.Context) error {
_, recordVersion, serverVersion, err := databroker.InitialSync(ctx, c.GetDataBrokerServiceClient(), &databroker.SyncLatestRequest{
Type: BundleCacheEntryRecordType,
})
if err != nil {
return fmt.Errorf("error during initial sync: %w", err)
}

stream, err := c.GetDataBrokerServiceClient().Sync(ctx, &databroker.SyncRequest{
Type: BundleCacheEntryRecordType,
ServerVersion: serverVersion,
RecordVersion: recordVersion,
})
if err != nil {
return fmt.Errorf("error calling sync: %w", err)
}

for {
_, err := stream.Recv()
if err != nil {
return fmt.Errorf("error receiving record: %w", err)
}
}
}

// run is a main control loop.
// it is very simple and sequential download and reconcile.
// it may be later optimized by splitting between download and reconciliation process,
// as we would get more resource bundles beyond the config.
func (c *service) watchUpdates(ctx context.Context) error {
return c.config.api.Watch(ctx,
connect_mux.WithOnConnected(func(ctx context.Context) {
Expand Down

0 comments on commit 07d6087

Please sign in to comment.