Skip to content

Commit

Permalink
feat: use circular buffer's new persistence option for machine logs
Browse files Browse the repository at this point in the history
Switch from the old machine log storage approach to using the new persistence feature in the go-circular: siderolabs/go-circular#4

Implement a migration code to migrate from old format to the new format when a machine log buffer is initialized.

Move some hardcoded log buffer settings into config (CLI args).

Use a default jitter of 0.1 (10+-1 minutes).

Rework the CLI arg names to be consistent.

Closes #118.

Signed-off-by: Utku Ozdemir <utku.ozdemir@siderolabs.com>
  • Loading branch information
utkuozdemir committed Jun 10, 2024
1 parent 7eec6b9 commit b7a0620
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 471 deletions.
41 changes: 36 additions & 5 deletions cmd/omni/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,16 @@ func runWithState(logger *zap.Logger) func(context.Context, state.State, *virtua
}

machineMap := siderolink.NewMachineMap(siderolink.NewStateStorage(omniRuntime.State()))
logHandler := siderolink.NewLogHandler(

logHandler, err := siderolink.NewLogHandler(
machineMap,
resourceState,
&config.Config.LogStorage,
&config.Config.MachineLogConfig,
logger.With(logging.Component("siderolink_log_handler")),
)
if err != nil {
return fmt.Errorf("failed to set up log handler: %w", err)
}

talosRuntime := talos.New(talosClientFactory, logger)

Expand Down Expand Up @@ -288,6 +292,7 @@ func main() {
}
}

//nolint:maintidx
func init() {
rootCmd.Flags().BoolVar(&rootCmdArgs.debug, "debug", false, "enable debug logs.")
rootCmd.Flags().StringVar(&rootCmdArgs.bindAddress, "bind-addr", "0.0.0.0:8080", "start HTTP server on the defined address.")
Expand Down Expand Up @@ -329,9 +334,35 @@ func init() {
rootCmd.Flags().IntVar(&config.Config.LoadBalancer.MaxPort, "lb-max-port", config.Config.LoadBalancer.MaxPort, "cluster load balancer port range max value.")
rootCmd.Flags().IntVar(&config.Config.LogServerPort, "log-server-port", config.Config.LogServerPort, "port for TCP log server")

rootCmd.Flags().BoolVar(&config.Config.LogStorage.Enabled, "log-storage-enabled", config.Config.LogStorage.Enabled, "enable log storage")
rootCmd.Flags().StringVar(&config.Config.LogStorage.Path, "log-storage-path", config.Config.LogStorage.Path, "path of the directory for storing logs")
rootCmd.Flags().DurationVar(&config.Config.LogStorage.FlushPeriod, "log-storage-flush-period", config.Config.LogStorage.FlushPeriod, "period for flushing logs to disk")
rootCmd.Flags().IntVar(&config.Config.MachineLogConfig.BufferInitialCapacity, "machine-log-buffer-capacity",
config.Config.MachineLogConfig.BufferInitialCapacity, "initial buffer capacity for machine logs in bytes")
rootCmd.Flags().IntVar(&config.Config.MachineLogConfig.BufferMaxCapacity, "machine-log-buffer-max-capacity",
config.Config.MachineLogConfig.BufferMaxCapacity, "max buffer capacity for machine logs in bytes")
rootCmd.Flags().IntVar(&config.Config.MachineLogConfig.BufferSafetyGap, "machine-log-buffer-safe-gap",
config.Config.MachineLogConfig.BufferSafetyGap, "safety gap for machine log buffer in bytes")
rootCmd.Flags().IntVar(&config.Config.MachineLogConfig.NumCompressedChunks, "machine-log-num-compressed-chunks",
config.Config.MachineLogConfig.NumCompressedChunks, "number of compressed log chunks to keep")
rootCmd.Flags().BoolVar(&config.Config.MachineLogConfig.StorageEnabled, "machine-log-storage-enabled",
config.Config.MachineLogConfig.StorageEnabled, "enable machine log storage")
rootCmd.Flags().StringVar(&config.Config.MachineLogConfig.StoragePath, "machine-log-storage-path",
config.Config.MachineLogConfig.StoragePath, "path of the directory for storing machine logs")
rootCmd.Flags().DurationVar(&config.Config.MachineLogConfig.StorageFlushPeriod, "machine-log-storage-flush-period",
config.Config.MachineLogConfig.StorageFlushPeriod, "period for flushing machine logs to disk")
rootCmd.Flags().Float64Var(&config.Config.MachineLogConfig.StorageFlushJitter, "machine-log-storage-flush-jitter",
config.Config.MachineLogConfig.StorageFlushJitter, "jitter for the machine log storage flush period")

// keep the old flags for backwards-compatibility
{
rootCmd.Flags().BoolVar(&config.Config.MachineLogConfig.StorageEnabled, "log-storage-enabled", config.Config.MachineLogConfig.StorageEnabled, "enable machine log storage")
rootCmd.Flags().StringVar(&config.Config.MachineLogConfig.StoragePath, "log-storage-path", config.Config.MachineLogConfig.StoragePath,
"path of the directory for storing machine logs")
rootCmd.Flags().DurationVar(&config.Config.MachineLogConfig.StorageFlushPeriod, "log-storage-flush-period", config.Config.MachineLogConfig.StorageFlushPeriod,
"period for flushing machine logs to disk")

rootCmd.Flags().MarkDeprecated("log-storage-enabled", "use --machine-log-storage-enabled") //nolint:errcheck
rootCmd.Flags().MarkDeprecated("log-storage-path", "use --machine-log-storage-path") //nolint:errcheck
rootCmd.Flags().MarkDeprecated("log-storage-flush-period", "use --machine-log-storage-flush-period") //nolint:errcheck
}

rootCmd.Flags().BoolVar(&config.Config.Auth.Auth0.Enabled, "auth-auth0-enabled", config.Config.Auth.Auth0.Enabled,
"enable Auth0 authentication. Once set to true, it cannot be set back to false.")
Expand Down
32 changes: 22 additions & 10 deletions internal/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Params struct {
LoadBalancer LoadBalancerParams `yaml:"loadbalancer"`
LogServerPort int `yaml:"logServerPort"`

LogStorage LogStorageParams `yaml:"logStorage"`
MachineLogConfig MachineLogConfigParams `yaml:"machineLogConfig"`

Auth AuthParams `yaml:"auth"`

Expand Down Expand Up @@ -182,11 +182,18 @@ type KeyPrunerParams struct {
Interval time.Duration `yaml:"interval"`
}

// LogStorageParams defines log storage configuration.
type LogStorageParams struct {
Path string `yaml:"directory"`
FlushPeriod time.Duration `yaml:"flushPeriod"`
Enabled bool `yaml:"enabled"`
// MachineLogConfigParams defines log storage configuration.
type MachineLogConfigParams struct {
StoragePath string `yaml:"directory"`

BufferInitialCapacity int `yaml:"bufferInitialCapacity"`
BufferMaxCapacity int `yaml:"bufferMaxCapacity"`
BufferSafetyGap int `yaml:"bufferSafetyGap"`
NumCompressedChunks int `yaml:"numCompressedChunks"`

StorageFlushPeriod time.Duration `yaml:"flushPeriod"`
StorageFlushJitter float64 `yaml:"flushJitter"`
StorageEnabled bool `yaml:"enabled"`
}

var (
Expand Down Expand Up @@ -219,10 +226,15 @@ var (
Interval: 10 * time.Minute,
},
LogServerPort: 8092,
LogStorage: LogStorageParams{
Enabled: true,
Path: "_out/logs",
FlushPeriod: 10 * time.Minute,
MachineLogConfig: MachineLogConfigParams{
BufferInitialCapacity: 16384,
BufferMaxCapacity: 131072,
BufferSafetyGap: 256,
NumCompressedChunks: 5,
StorageEnabled: true,
StoragePath: "_out/logs",
StorageFlushPeriod: 10 * time.Minute,
StorageFlushJitter: 0.1,
},
TalosRegistry: consts.TalosRegistry,
KubernetesRegistry: consts.KubernetesRegistry,
Expand Down
74 changes: 16 additions & 58 deletions internal/pkg/siderolink/loghandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ package siderolink
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net/netip"
"time"

"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/optional"
Expand All @@ -25,32 +23,28 @@ import (
)

// NewLogHandler returns a new LogHandler.
func NewLogHandler(machineMap *MachineMap, omniState state.State, storageConfig *config.LogStorageParams, logger *zap.Logger) *LogHandler {
storage := optional.None[*LogStorage]()

if storageConfig.Enabled {
storage = optional.Some(NewLogStorage(storageConfig.Path))
func NewLogHandler(machineMap *MachineMap, omniState state.State, storageConfig *config.MachineLogConfigParams, logger *zap.Logger) (*LogHandler, error) {
cache, err := NewMachineCache(storageConfig, logger)
if err != nil {
return nil, fmt.Errorf("failed to create machine cache: %w", err)
}

cache := NewMachineCache(storage, logger)
handler := LogHandler{
StorageFlushPeriod: storageConfig.FlushPeriod,
Map: machineMap,
OmniState: omniState,
Cache: cache,
logger: logger,
Map: machineMap,
OmniState: omniState,
Cache: cache,
logger: logger,
}

return &handler
return &handler, nil
}

// LogHandler stores a map of machines to their circular log buffers.
type LogHandler struct {
OmniState state.State
Map *MachineMap
logger *zap.Logger
Cache *MachineCache
StorageFlushPeriod time.Duration
OmniState state.State
Map *MachineMap
logger *zap.Logger
Cache *MachineCache
}

// Start starts the LogHandler.
Expand All @@ -68,50 +62,14 @@ func (h *LogHandler) Start(ctx context.Context) error {
return err
}

var tickerCh <-chan time.Time

var storagePath string

storage, storageEnabled := h.Cache.Storage.Get()
if storageEnabled {
ticker := time.NewTicker(h.StorageFlushPeriod)
tickerCh = ticker.C

defer ticker.Stop()

storagePath = storage.Path
}

for {
select {
case <-ctx.Done():
if storageEnabled {
h.logger.Info("save all log buffers before shutdown", zap.String("storage_path", storagePath))

err := h.Cache.SaveAll()
if err != nil {
h.logger.Error("failed to save all log buffers", zap.Error(err))

return ctx.Err()
}
if err := h.Cache.Close(); err != nil {
h.logger.Error("failed to close machine logs cache", zap.Error(err))
}

if errors.Is(ctx.Err(), context.Canceled) {
return nil
}

return ctx.Err()
case <-tickerCh:
h.logger.Info(
"save all log buffers",
zap.String("storage_path", storagePath),
zap.Duration("period", h.StorageFlushPeriod),
)

err := h.Cache.SaveAll()
if err != nil {
h.logger.Error("failed to save all log buffers", zap.Error(err))
}
return nil
case event := <-eventCh:
switch event.Type {
case state.Created, state.Updated, state.Bootstrapped:
Expand Down
Loading

0 comments on commit b7a0620

Please sign in to comment.