Skip to content

Commit

Permalink
zero: managed mode controller (#4459)
Browse files Browse the repository at this point in the history
  • Loading branch information
wasaga committed Aug 17, 2023
1 parent 3b65049 commit d5ef01d
Show file tree
Hide file tree
Showing 11 changed files with 417 additions and 2 deletions.
8 changes: 7 additions & 1 deletion cmd/pomerium/main.go
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/version"
zero_cmd "github.com/pomerium/pomerium/internal/zero/cmd"
"github.com/pomerium/pomerium/pkg/cmd/pomerium"
"github.com/pomerium/pomerium/pkg/envoy/files"
)
Expand All @@ -30,7 +31,12 @@ func main() {
}

ctx := context.Background()
if err := run(ctx); !errors.Is(err, context.Canceled) {
runFn := run
if zero_cmd.IsManagedMode() {
runFn = zero_cmd.Run
}

if err := runFn(ctx); err != nil && !errors.Is(err, context.Canceled) {
log.Fatal().Err(err).Msg("cmd/pomerium")
}
log.Info(ctx).Msg("cmd/pomerium: exiting")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -40,6 +40,7 @@ require (
github.com/jackc/pgx/v5 v5.4.2
github.com/klauspost/compress v1.16.7
github.com/martinlindhe/base36 v1.1.1
github.com/mattn/go-isatty v0.0.19
github.com/mholt/acmez v1.2.0
github.com/minio/minio-go/v7 v7.0.61
github.com/mitchellh/hashstructure/v2 v2.0.2
Expand Down Expand Up @@ -188,7 +189,6 @@ require (
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/miekg/dns v1.1.55 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
Expand Down
83 changes: 83 additions & 0 deletions internal/zero/cmd/command.go
@@ -0,0 +1,83 @@
// Package cmd implements the pomerium zero command.
package cmd

import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/mattn/go-isatty"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"github.com/pomerium/pomerium/internal/zero/controller"
)

// Run runs the pomerium zero command.
func Run(ctx context.Context) error {
err := setupLogger()
if err != nil {
return fmt.Errorf("error setting up logger: %w", err)
}

token := getToken()
if token == "" {
return errors.New("no token provided")
}

return controller.Run(
withInterrupt(ctx),
controller.WithAPIToken(token),
controller.WithClusterAPIEndpoint(getClusterAPIEndpoint()),
controller.WithConnectAPIEndpoint(getConnectAPIEndpoint()),
)
}

// IsManagedMode returns true if Pomerium should start in managed mode using this command.
func IsManagedMode() bool {
return getToken() != ""
}

func withInterrupt(ctx context.Context) context.Context {
ctx, cancel := context.WithCancel(ctx)
go func(ctx context.Context) {
ch := make(chan os.Signal, 2)
defer signal.Stop(ch)

signal.Notify(ch, os.Interrupt)
signal.Notify(ch, syscall.SIGTERM)

select {
case sig := <-ch:
log.Ctx(ctx).Info().Str("signal", sig.String()).Msg("quitting...")
case <-ctx.Done():
}
cancel()
}(ctx)
return ctx
}

func setupLogger() error {
if isatty.IsTerminal(os.Stdin.Fd()) {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
} else {
log.Logger = zerolog.New(os.Stderr)
}

if rawLvl, ok := os.LookupEnv("LOG_LEVEL"); ok {
lvl, err := zerolog.ParseLevel(rawLvl)
if err != nil {
return err
}
log.Logger = log.Logger.Level(lvl)
} else {
log.Logger = log.Logger.Level(zerolog.InfoLevel)
}

// set the default context logger
zerolog.DefaultContextLogger = &log.Logger
return nil
}
13 changes: 13 additions & 0 deletions internal/zero/cmd/env.go
@@ -0,0 +1,13 @@
package cmd

import "os"

const (
// PomeriumZeroTokenEnv is the environment variable name for the API token.
//nolint: gosec
PomeriumZeroTokenEnv = "POMERIUM_ZERO_TOKEN"
)

func getToken() string {
return os.Getenv(PomeriumZeroTokenEnv)
}
21 changes: 21 additions & 0 deletions internal/zero/cmd/env_dev.go
@@ -0,0 +1,21 @@
//go:build !release

package cmd

import "os"

func getConnectAPIEndpoint() string {
connectServerEndpoint := os.Getenv("CONNECT_SERVER_ENDPOINT")
if connectServerEndpoint == "" {
connectServerEndpoint = "http://localhost:8721"
}
return connectServerEndpoint
}

func getClusterAPIEndpoint() string {
clusterAPIEndpoint := os.Getenv("CLUSTER_API_ENDPOINT")
if clusterAPIEndpoint == "" {
clusterAPIEndpoint = "http://localhost:8720/cluster/v1"
}
return clusterAPIEndpoint
}
11 changes: 11 additions & 0 deletions internal/zero/cmd/env_release.go
@@ -0,0 +1,11 @@
//go:build release

package cmd

func getConnectAPIEndpoint() string {
return "https://connect.pomerium.com"
}

func getClusterAPIEndpoint() string {
return "https://console.pomerium.com/cluster/v1"
}
86 changes: 86 additions & 0 deletions internal/zero/controller/config.go
@@ -0,0 +1,86 @@
package controller

import "time"

// Option configures a controller.
type Option func(*controllerConfig)

type controllerConfig struct {
apiToken string
clusterAPIEndpoint string
connectAPIEndpoint string

tmpDir string
bootstrapConfigFileName string

reconcilerLeaseDuration time.Duration
databrokerRequestTimeout time.Duration
}

// WithTmpDir sets the temporary directory to use.
func WithTmpDir(dir string) Option {
return func(c *controllerConfig) {
c.tmpDir = dir
}
}

// WithClusterAPIEndpoint sets the endpoint to use for the cluster API
func WithClusterAPIEndpoint(endpoint string) Option {
return func(c *controllerConfig) {
c.clusterAPIEndpoint = endpoint
}
}

// WithConnectAPIEndpoint sets the endpoint to use for the connect API
func WithConnectAPIEndpoint(endpoint string) Option {
return func(c *controllerConfig) {
c.connectAPIEndpoint = endpoint
}
}

// WithAPIToken sets the API token to use for authentication.
func WithAPIToken(token string) Option {
return func(c *controllerConfig) {
c.apiToken = token
}
}

// WithBootstrapConfigFileName sets the name of the file to store the bootstrap config in.
func WithBootstrapConfigFileName(name string) Option {
return func(c *controllerConfig) {
c.bootstrapConfigFileName = name
}
}

// WithDatabrokerLeaseDuration sets the lease duration for the
func WithDatabrokerLeaseDuration(duration time.Duration) Option {
return func(c *controllerConfig) {
c.reconcilerLeaseDuration = duration
}
}

// WithDatabrokerRequestTimeout sets the timeout for databroker requests.
func WithDatabrokerRequestTimeout(timeout time.Duration) Option {
return func(c *controllerConfig) {
c.databrokerRequestTimeout = timeout
}
}

func newControllerConfig(opts ...Option) *controllerConfig {
c := new(controllerConfig)

for _, opt := range []Option{
WithClusterAPIEndpoint("https://console.pomerium.com/cluster/v1"),
WithConnectAPIEndpoint("https://connect.pomerium.com"),
WithBootstrapConfigFileName("/var/cache/pomerium-bootstrap.dat"),
WithDatabrokerLeaseDuration(time.Second * 30),
WithDatabrokerRequestTimeout(time.Second * 30),
} {
opt(c)
}

for _, opt := range opts {
opt(c)
}
return c
}
99 changes: 99 additions & 0 deletions internal/zero/controller/controller.go
@@ -0,0 +1,99 @@
// Package controller implements Pomerium managed mode
package controller

import (
"context"
"errors"
"fmt"

"golang.org/x/sync/errgroup"

"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/zero/bootstrap"
"github.com/pomerium/pomerium/pkg/cmd/pomerium"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
sdk "github.com/pomerium/zero-sdk"
)

// Run runs Pomerium is managed mode using the provided token.
func Run(ctx context.Context, opts ...Option) error {
c := controller{cfg: newControllerConfig(opts...)}
eg, ctx := errgroup.WithContext(ctx)

err := c.initAPI(ctx)
if err != nil {
return fmt.Errorf("init api: %w", err)
}

src, err := bootstrap.New([]byte(c.cfg.apiToken))
if err != nil {
return fmt.Errorf("error creating bootstrap config: %w", err)
}
c.bootstrapConfig = src

err = c.InitDatabrokerClient(ctx, src.GetConfig())
if err != nil {
return fmt.Errorf("init databroker client: %w", err)
}

eg.Go(func() error { return run(ctx, "connect", c.runConnect, nil) })
eg.Go(func() error { return run(ctx, "zero-bootstrap", c.runBootstrap, nil) })
eg.Go(func() error { return run(ctx, "pomerium-core", c.runPomeriumCore, src.WaitReady) })
eg.Go(func() error { return run(ctx, "zero-reconciler", c.RunReconciler, src.WaitReady) })
eg.Go(func() error { return run(ctx, "connect-log", c.RunConnectLog, nil) })
return eg.Wait()
}

type controller struct {
cfg *controllerConfig

api *sdk.API

bootstrapConfig *bootstrap.Source

databrokerClient databroker.DataBrokerServiceClient
}

func (c *controller) initAPI(ctx context.Context) error {
api, err := sdk.NewAPI(ctx,
sdk.WithClusterAPIEndpoint(c.cfg.clusterAPIEndpoint),
sdk.WithAPIToken(c.cfg.apiToken),
sdk.WithConnectAPIEndpoint(c.cfg.connectAPIEndpoint),
)
if err != nil {
return fmt.Errorf("error initializing cloud api: %w", err)
}

c.api = api

return nil
}

func run(ctx context.Context, name string, runFn func(context.Context) error, waitFn func(context.Context) error) error {
if waitFn != nil {
log.Ctx(ctx).Info().Str("name", name).Msg("waiting for initial configuration")
err := waitFn(ctx)
if err != nil {
return fmt.Errorf("%s: error waiting for initial configuration: %w", name, err)
}
}

log.Ctx(ctx).Info().Str("name", name).Msg("starting")
err := runFn(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("%s: %w", name, err)
}
return nil
}

func (c *controller) runBootstrap(ctx context.Context) error {
return c.bootstrapConfig.Run(ctx, c.api, c.cfg.bootstrapConfigFileName)
}

func (c *controller) runPomeriumCore(ctx context.Context) error {
return pomerium.Run(ctx, c.bootstrapConfig)
}

func (c *controller) runConnect(ctx context.Context) error {
return c.api.Connect(ctx)
}

0 comments on commit d5ef01d

Please sign in to comment.