Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions cmd/api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,11 @@ type APIConfig struct {

// MetricsConfig holds metrics endpoint settings.
type MetricsConfig struct {
ListenAddress string `koanf:"listen_address"`
Port int `koanf:"port"`
VMLabelBudget int `koanf:"vm_label_budget"`
ResourceRefreshInterval string `koanf:"resource_refresh_interval"`
ListenAddress string `koanf:"listen_address"`
Port int `koanf:"port"`
VMLabelBudget int `koanf:"vm_label_budget"`
ResourceRefreshInterval string `koanf:"resource_refresh_interval"`
AllocationReconcileInterval string `koanf:"allocation_reconcile_interval"`
}

// OtelConfig holds OpenTelemetry settings.
Expand Down Expand Up @@ -315,10 +316,11 @@ func defaultConfig() *Config {
},

Metrics: MetricsConfig{
ListenAddress: "127.0.0.1",
Port: 9464,
VMLabelBudget: 200,
ResourceRefreshInterval: "120s",
ListenAddress: "127.0.0.1",
Port: 9464,
VMLabelBudget: 200,
ResourceRefreshInterval: "120s",
AllocationReconcileInterval: "120s",
},

Otel: OtelConfig{
Expand Down Expand Up @@ -507,6 +509,16 @@ func (c *Config) Validate() error {
if interval <= 0 {
return fmt.Errorf("metrics.resource_refresh_interval must be positive, got %q", c.Metrics.ResourceRefreshInterval)
}
if strings.TrimSpace(c.Metrics.AllocationReconcileInterval) == "" {
return fmt.Errorf("metrics.allocation_reconcile_interval must not be empty")
}
reconcileInterval, err := time.ParseDuration(c.Metrics.AllocationReconcileInterval)
if err != nil {
return fmt.Errorf("metrics.allocation_reconcile_interval must be a valid duration, got %q: %w", c.Metrics.AllocationReconcileInterval, err)
}
if reconcileInterval <= 0 {
return fmt.Errorf("metrics.allocation_reconcile_interval must be positive, got %q", c.Metrics.AllocationReconcileInterval)
}
if c.Otel.MetricExportInterval != "" {
if _, err := time.ParseDuration(c.Otel.MetricExportInterval); err != nil {
return fmt.Errorf("otel.metric_export_interval must be a valid duration, got %q: %w", c.Otel.MetricExportInterval, err)
Expand Down
33 changes: 33 additions & 0 deletions cmd/api/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func TestDefaultConfigIncludesMetricsSettings(t *testing.T) {
if cfg.Metrics.ResourceRefreshInterval != "120s" {
t.Fatalf("expected default metrics.resource_refresh_interval to be 120s, got %q", cfg.Metrics.ResourceRefreshInterval)
}
if cfg.Metrics.AllocationReconcileInterval != "120s" {
t.Fatalf("expected default metrics.allocation_reconcile_interval to be 120s, got %q", cfg.Metrics.AllocationReconcileInterval)
}
if cfg.Otel.MetricExportInterval != "60s" {
t.Fatalf("expected default otel.metric_export_interval to be 60s, got %q", cfg.Otel.MetricExportInterval)
}
Expand All @@ -50,6 +53,7 @@ func TestLoadEnvOverridesMetricsAndOtelInterval(t *testing.T) {
t.Setenv("METRICS__PORT", "9999")
t.Setenv("METRICS__VM_LABEL_BUDGET", "350")
t.Setenv("METRICS__RESOURCE_REFRESH_INTERVAL", "30s")
t.Setenv("METRICS__ALLOCATION_RECONCILE_INTERVAL", "45s")
t.Setenv("OTEL__METRIC_EXPORT_INTERVAL", "15s")
t.Setenv("OTEL__SUCCESSFUL_GET_SAMPLE_RATIO", "0.25")
t.Setenv("INSTANCES__LIFECYCLE_EVENT_BUFFER_SIZE", "512")
Expand Down Expand Up @@ -77,6 +81,9 @@ func TestLoadEnvOverridesMetricsAndOtelInterval(t *testing.T) {
if cfg.Metrics.ResourceRefreshInterval != "30s" {
t.Fatalf("expected metrics.resource_refresh_interval override, got %q", cfg.Metrics.ResourceRefreshInterval)
}
if cfg.Metrics.AllocationReconcileInterval != "45s" {
t.Fatalf("expected metrics.allocation_reconcile_interval override, got %q", cfg.Metrics.AllocationReconcileInterval)
}
if cfg.Otel.MetricExportInterval != "15s" {
t.Fatalf("expected otel.metric_export_interval override, got %q", cfg.Otel.MetricExportInterval)
}
Expand Down Expand Up @@ -154,6 +161,32 @@ func TestValidateRejectsInvalidResourceRefreshInterval(t *testing.T) {
}
}

func TestValidateRejectsInvalidAllocationReconcileInterval(t *testing.T) {
cfg := defaultConfig()
cfg.Metrics.AllocationReconcileInterval = ""

err := cfg.Validate()
if err == nil {
t.Fatalf("expected validation error for empty allocation reconcile interval")
}

cfg = defaultConfig()
cfg.Metrics.AllocationReconcileInterval = "not-a-duration"

err = cfg.Validate()
if err == nil {
t.Fatalf("expected validation error for invalid allocation reconcile interval")
}

cfg = defaultConfig()
cfg.Metrics.AllocationReconcileInterval = "0s"

err = cfg.Validate()
if err == nil {
t.Fatalf("expected validation error for non-positive allocation reconcile interval")
}
}

func TestLoadUsesConfiguredLifecycleEventBufferSize(t *testing.T) {
tmp := t.TempDir()
cfgPath := filepath.Join(tmp, "config.yaml")
Expand Down
9 changes: 9 additions & 0 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,18 @@ func run() error {
if err != nil {
return fmt.Errorf("invalid metrics resource refresh interval %q: %w", app.Config.Metrics.ResourceRefreshInterval, err)
}
allocationReconcileInterval, err := time.ParseDuration(app.Config.Metrics.AllocationReconcileInterval)
if err != nil {
return fmt.Errorf("invalid metrics allocation reconcile interval %q: %w", app.Config.Metrics.AllocationReconcileInterval, err)
}
if err := app.ResourceManager.StartMonitoring(ctx, otelProvider.Meter, resourceRefreshInterval); err != nil {
return fmt.Errorf("start resource monitoring: %w", err)
}
if reconciler, ok := app.InstanceManager.(interface {
StartAdmissionAllocationReconciler(context.Context, time.Duration)
}); ok {
reconciler.StartAdmissionAllocationReconciler(ctx, allocationReconcileInterval)
}

// Log OTel status
if cfg.Otel.Enabled {
Expand Down
174 changes: 174 additions & 0 deletions lib/images/disk_usage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package images

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
)

// totalReadyImageBytesFromMetadata sums ready image sizes directly from metadata.json files.
// This is conservative for admission control and disk accounting: if metadata says an
// image is ready, we count its recorded size without re-validating the rootfs path. If
// the metadata file is unreadable or malformed, we fall back to counting any rootfs disk
// files found in the digest directory so we do not undercount host disk usage.
func totalReadyImageBytesFromMetadata(imagesDir string) (int64, error) {
var total int64

err := filepath.Walk(imagesDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
if info.IsDir() || info.Name() != "metadata.json" {
return nil
}

data, err := os.ReadFile(path)
if err != nil {
rootfsBytes, fallbackErr := totalRootfsBytesInDigestDir(filepath.Dir(path))
if fallbackErr == nil {
total += rootfsBytes
return nil
}
return fmt.Errorf("read image metadata %s: %w", path, err)
}

var meta imageMetadata
if err := json.Unmarshal(data, &meta); err != nil {
rootfsBytes, fallbackErr := totalRootfsBytesInDigestDir(filepath.Dir(path))
if fallbackErr == nil {
total += rootfsBytes
return nil
}
return fmt.Errorf("unmarshal image metadata %s: %w", path, err)
}
if meta.Status == StatusReady && meta.SizeBytes > 0 {
total += meta.SizeBytes
return nil
}
if meta.Status == StatusReady {
rootfsBytes, err := totalRootfsBytesInDigestDir(filepath.Dir(path))
if err != nil {
return fmt.Errorf("stat ready image rootfs for %s: %w", path, err)
}
total += rootfsBytes
}
return nil
})
if err != nil && !os.IsNotExist(err) {
return 0, fmt.Errorf("walk images directory: %w", err)
}

return total, nil
}

// totalOCICacheBlobBytesFromFilesystem sums blob sizes directly from the OCI cache blob store.
// This counts the actual bytes on disk, including any blob files that are currently
// present but no longer referenced by the OCI layout index.
func totalOCICacheBlobBytesFromFilesystem(blobDir string) (int64, error) {
var total int64

err := filepath.Walk(blobDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
if info.IsDir() {
return nil
}
total += info.Size()
return nil
})
if err != nil && !os.IsNotExist(err) {
return 0, fmt.Errorf("walk OCI cache blobs: %w", err)
}

return total, nil
}

func (m *manager) getDiskUsageTotals() (int64, int64, error) {
m.diskUsageMu.RLock()
if m.diskUsageLoaded {
readyImageBytes := m.readyImageBytes
ociCacheBytes := m.ociCacheBytes
m.diskUsageMu.RUnlock()
return readyImageBytes, ociCacheBytes, nil
}
m.diskUsageMu.RUnlock()

readyImageBytes, ociCacheBytes, err := m.computeDiskUsageTotals()
if err != nil {
return 0, 0, err
}

m.diskUsageMu.Lock()
if !m.diskUsageLoaded {
m.readyImageBytes = readyImageBytes
m.ociCacheBytes = ociCacheBytes
m.diskUsageLoaded = true
}
readyImageBytes = m.readyImageBytes
ociCacheBytes = m.ociCacheBytes
m.diskUsageMu.Unlock()

return readyImageBytes, ociCacheBytes, nil
}

func (m *manager) refreshDiskUsageTotals() {
readyImageBytes, ociCacheBytes, err := m.computeDiskUsageTotals()
if err != nil {
return
}

m.diskUsageMu.Lock()
m.readyImageBytes = readyImageBytes
m.ociCacheBytes = ociCacheBytes
m.diskUsageLoaded = true
m.diskUsageMu.Unlock()
}

func (m *manager) computeDiskUsageTotals() (int64, int64, error) {
readyImageBytes, err := totalReadyImageBytesFromMetadata(m.paths.ImagesDir())
if err != nil {
return 0, 0, err
}
ociCacheBytes, err := totalOCICacheBlobBytesFromFilesystem(m.paths.OCICacheBlobDir())
if err != nil {
return 0, 0, err
}
return readyImageBytes, ociCacheBytes, nil
}

func totalRootfsBytesInDigestDir(digestDir string) (int64, error) {
rootfsPaths, err := filepath.Glob(filepath.Join(digestDir, "rootfs.*"))
if err != nil {
return 0, err
}
if len(rootfsPaths) == 0 {
return 0, os.ErrNotExist
}

var total int64
for _, rootfsPath := range rootfsPaths {
info, err := os.Stat(rootfsPath)
if err != nil {
if os.IsNotExist(err) {
continue
}
return 0, err
}
if info.IsDir() {
continue
}
total += info.Size()
}
if total == 0 {
return 0, os.ErrNotExist
}
return total, nil
}
37 changes: 37 additions & 0 deletions lib/images/disk_usage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package images

import (
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
)

func TestTotalReadyImageBytesFromMetadata_UsesRootfsFallbackForMalformedMetadata(t *testing.T) {
t.Parallel()

imagesDir := t.TempDir()
digestDir := filepath.Join(imagesDir, "docker.io", "library", "alpine", "sha256deadbeef")
require.NoError(t, os.MkdirAll(digestDir, 0o755))
require.NoError(t, os.WriteFile(filepath.Join(digestDir, "metadata.json"), []byte("{not-json"), 0o644))
require.NoError(t, os.WriteFile(filepath.Join(digestDir, "rootfs.erofs"), []byte("rootfs-data"), 0o644))

total, err := totalReadyImageBytesFromMetadata(imagesDir)
require.NoError(t, err)
require.Equal(t, int64(len("rootfs-data")), total)
}

func TestTotalReadyImageBytesFromMetadata_UsesRootfsFallbackForReadyImageWithoutSize(t *testing.T) {
t.Parallel()

imagesDir := t.TempDir()
digestDir := filepath.Join(imagesDir, "docker.io", "library", "alpine", "sha256deadbeef")
require.NoError(t, os.MkdirAll(digestDir, 0o755))
require.NoError(t, os.WriteFile(filepath.Join(digestDir, "metadata.json"), []byte(`{"status":"ready","size_bytes":0}`), 0o644))
require.NoError(t, os.WriteFile(filepath.Join(digestDir, "rootfs.erofs"), []byte("another-rootfs"), 0o644))

total, err := totalReadyImageBytesFromMetadata(imagesDir)
require.NoError(t, err)
require.Equal(t, int64(len("another-rootfs")), total)
}
Loading
Loading