From 0533544039bdd33558f27308780bd8c3b63976e5 Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Sun, 31 May 2026 23:20:17 -0400 Subject: [PATCH 1/3] Improve Firecracker fork concurrency --- cmd/api/api/instances.go | 10 ++ cmd/api/api/instances_test.go | 37 +++++++ cmd/api/api/snapshots.go | 2 + lib/hypervisor/firecracker/fork.go | 67 +++++++++-- lib/hypervisor/firecracker/fork_test.go | 75 +++++++++++++ lib/hypervisor/firecracker/process.go | 32 ++++-- lib/hypervisor/firecracker/snapshot_state.go | 108 ++++++++++++++++++ lib/hypervisor/snapshot_alias_lock.go | 20 ++++ lib/instances/fork.go | 48 +++++--- lib/instances/fork_test.go | 110 +++++++++++++++++++ lib/instances/manager.go | 32 +++++- lib/instances/snapshot.go | 53 ++++++--- lib/instances/storage.go | 7 ++ 13 files changed, 544 insertions(+), 57 deletions(-) create mode 100644 lib/hypervisor/firecracker/snapshot_state.go create mode 100644 lib/hypervisor/snapshot_alias_lock.go diff --git a/cmd/api/api/instances.go b/cmd/api/api/instances.go index 9d59515d..3656fcd3 100644 --- a/cmd/api/api/instances.go +++ b/cmd/api/api/instances.go @@ -620,6 +620,11 @@ func (s *ApiService) RestoreInstance(ctx context.Context, request oapi.RestoreIn Code: "invalid_state", Message: err.Error(), }, nil + case errors.Is(err, instances.ErrInsufficientResources): + return oapi.RestoreInstance409JSONResponse{ + Code: "insufficient_resources", + Message: err.Error(), + }, nil default: log.ErrorContext(ctx, "failed to restore instance", "error", err) return oapi.RestoreInstance500JSONResponse{ @@ -684,6 +689,11 @@ func (s *ApiService) ForkInstance(ctx context.Context, request oapi.ForkInstance Code: "name_conflict", Message: err.Error(), }, nil + case errors.Is(err, instances.ErrInsufficientResources): + return oapi.ForkInstance409JSONResponse{ + Code: "insufficient_resources", + Message: err.Error(), + }, nil case errors.Is(err, instances.ErrNotSupported): return oapi.ForkInstance501JSONResponse{ Code: "not_supported", diff --git a/cmd/api/api/instances_test.go b/cmd/api/api/instances_test.go index 896a0632..430b4ad6 100644 --- a/cmd/api/api/instances_test.go +++ b/cmd/api/api/instances_test.go @@ -1265,6 +1265,43 @@ func TestForkInstance_InvalidRequest(t *testing.T) { assert.Equal(t, "invalid_request", badReq.Code) } +func TestForkInstance_InsufficientResources(t *testing.T) { + t.Parallel() + svc := newTestService(t) + + source := instances.Instance{ + StoredMetadata: instances.StoredMetadata{ + Id: "src-instance", + Name: "src-instance", + Image: "docker.io/library/alpine:latest", + CreatedAt: time.Now(), + HypervisorType: hypervisor.TypeFirecracker, + }, + State: instances.StateStandby, + } + + mockMgr := &captureForkManager{ + Manager: svc.InstanceManager, + err: fmt.Errorf("apply fork target state: %w: insufficient network bandwidth", instances.ErrInsufficientResources), + } + svc.InstanceManager = mockMgr + + resp, err := svc.ForkInstance( + mw.WithResolvedInstance(ctx(), source.Id, source), + oapi.ForkInstanceRequestObject{ + Id: source.Id, + Body: &oapi.ForkInstanceRequest{ + Name: "forked-instance", + }, + }, + ) + require.NoError(t, err) + + conflict, ok := resp.(oapi.ForkInstance409JSONResponse) + require.True(t, ok, "expected 409 response") + assert.Equal(t, "insufficient_resources", conflict.Code) +} + func TestStandbyInstance_InvalidRequest(t *testing.T) { t.Parallel() svc := newTestService(t) diff --git a/cmd/api/api/snapshots.go b/cmd/api/api/snapshots.go index ffe35474..7e4060eb 100644 --- a/cmd/api/api/snapshots.go +++ b/cmd/api/api/snapshots.go @@ -196,6 +196,8 @@ func (s *ApiService) ForkSnapshot(ctx context.Context, request oapi.ForkSnapshot return oapi.ForkSnapshot400JSONResponse{Code: "invalid_request", Message: err.Error()}, nil case errors.Is(err, instances.ErrInvalidState), errors.Is(err, instances.ErrAlreadyExists), errors.Is(err, network.ErrNameExists): return oapi.ForkSnapshot409JSONResponse{Code: "conflict", Message: err.Error()}, nil + case errors.Is(err, instances.ErrInsufficientResources): + return oapi.ForkSnapshot409JSONResponse{Code: "insufficient_resources", Message: err.Error()}, nil case errors.Is(err, instances.ErrNotSupported): return oapi.ForkSnapshot501JSONResponse{Code: "not_supported", Message: err.Error()}, nil default: diff --git a/lib/hypervisor/firecracker/fork.go b/lib/hypervisor/firecracker/fork.go index 67a70293..eac8d57b 100644 --- a/lib/hypervisor/firecracker/fork.go +++ b/lib/hypervisor/firecracker/fork.go @@ -50,9 +50,20 @@ func (s *Starter) PrepareFork(ctx context.Context, req hypervisor.ForkPrepareReq } } if req.SourceDataDir != "" && req.TargetDataDir != "" && req.SourceDataDir != req.TargetDataDir { - if meta.RetainSnapshotSourceDataDirAlias && meta.SnapshotSourceDataDir != "" { - // Keep the upstream source path for snapshot-derived forks. The retained - // Firecracker base can still reference that path after later diff snapshots. + statePath := snapshotStatePath(filepath.Dir(req.SnapshotConfigPath)) + rewritten, err := rewriteSnapshotStatePathsForFork(statePath, snapshotStatePathRewritesForFork(meta, req.SourceDataDir, req.TargetDataDir)) + if err != nil { + return hypervisor.ForkPrepareResult{}, err + } + if rewritten { + if meta.SnapshotSourceDataDir != "" { + meta.SnapshotSourceDataDir = "" + changed = true + } + if meta.RetainSnapshotSourceDataDirAlias { + meta.RetainSnapshotSourceDataDirAlias = false + changed = true + } } else { retainAlias := false if _, err := os.Stat(req.SourceDataDir); err != nil { @@ -62,13 +73,18 @@ func (s *Starter) PrepareFork(ctx context.Context, req hypervisor.ForkPrepareReq return hypervisor.ForkPrepareResult{}, fmt.Errorf("stat snapshot source data dir %q: %w", req.SourceDataDir, err) } } - if meta.SnapshotSourceDataDir != req.SourceDataDir { - meta.SnapshotSourceDataDir = req.SourceDataDir - changed = true - } - if meta.RetainSnapshotSourceDataDirAlias != retainAlias { - meta.RetainSnapshotSourceDataDirAlias = retainAlias - changed = true + if meta.RetainSnapshotSourceDataDirAlias && meta.SnapshotSourceDataDir != "" { + // Keep the upstream source path for snapshot-derived forks. The retained + // Firecracker base can still reference that path after later diff snapshots. + } else { + if meta.SnapshotSourceDataDir != req.SourceDataDir { + meta.SnapshotSourceDataDir = req.SourceDataDir + changed = true + } + if meta.RetainSnapshotSourceDataDirAlias != retainAlias { + meta.RetainSnapshotSourceDataDirAlias = retainAlias + changed = true + } } } } @@ -81,3 +97,34 @@ func (s *Starter) PrepareFork(ctx context.Context, req hypervisor.ForkPrepareReq return hypervisor.ForkPrepareResult{}, nil } + +func snapshotStatePathRewritesForFork(meta *restoreMetadata, sourceDataDir, targetDataDir string) []snapshotStatePathRewrite { + var rewrites []snapshotStatePathRewrite + add := func(source, target string) { + if source == "" || target == "" { + return + } + rewrites = append(rewrites, snapshotStatePathRewrite{Source: source, Target: target}) + } + + add(sourceDataDir, targetDataDir) + if meta != nil { + add(meta.SnapshotSourceDataDir, targetDataDir) + } + if resolvedTarget, err := filepath.EvalSymlinks(targetDataDir); err == nil { + add(sourceDataDir, resolvedTarget) + if meta != nil { + add(meta.SnapshotSourceDataDir, resolvedTarget) + } + if resolvedSource, err := filepath.EvalSymlinks(sourceDataDir); err == nil { + add(resolvedSource, resolvedTarget) + } + if meta != nil && meta.SnapshotSourceDataDir != "" { + if resolvedSource, err := filepath.EvalSymlinks(meta.SnapshotSourceDataDir); err == nil { + add(resolvedSource, resolvedTarget) + } + } + } + + return rewrites +} diff --git a/lib/hypervisor/firecracker/fork_test.go b/lib/hypervisor/firecracker/fork_test.go index 109ae7e7..1d92ae63 100644 --- a/lib/hypervisor/firecracker/fork_test.go +++ b/lib/hypervisor/firecracker/fork_test.go @@ -2,6 +2,7 @@ package firecracker import ( "context" + "encoding/binary" "os" "path/filepath" "testing" @@ -65,6 +66,72 @@ func TestPrepareFork_DoesNotRetainExistingSourceAlias(t *testing.T) { assert.False(t, meta.RetainSnapshotSourceDataDirAlias) } +func TestPrepareFork_RewritesSnapshotStatePathsWithoutAlias(t *testing.T) { + starter := NewStarter() + tmp := t.TempDir() + sourceDir := filepath.Join(tmp, "source-12345678901234567890") + targetDir := filepath.Join(tmp, "target-12345678901234567890") + snapshotDir := filepath.Join(targetDir, "snapshots", "snapshot-latest") + require.NoError(t, os.MkdirAll(sourceDir, 0755)) + require.NoError(t, os.MkdirAll(snapshotDir, 0755)) + require.NoError(t, saveRestoreMetadata(targetDir, []networkInterface{{IfaceID: "eth0", HostDevName: "tap-old"}})) + writeSnapshotStateForTest(t, snapshotStatePath(snapshotDir), []byte("drive="+filepath.Join(sourceDir, "overlay.raw")+" vsock="+filepath.Join(sourceDir, "vsock.sock"))) + + _, err := starter.PrepareFork(context.Background(), hypervisor.ForkPrepareRequest{ + SnapshotConfigPath: filepath.Join(snapshotDir, "config.json"), + SourceDataDir: sourceDir, + TargetDataDir: targetDir, + Network: &hypervisor.ForkNetworkConfig{ + TAPDevice: "tap-new", + }, + }) + require.NoError(t, err) + + meta, err := loadRestoreMetadata(targetDir) + require.NoError(t, err) + require.Len(t, meta.NetworkOverrides, 1) + assert.Equal(t, "tap-new", meta.NetworkOverrides[0].HostDevName) + assert.Empty(t, meta.SnapshotSourceDataDir) + assert.False(t, meta.RetainSnapshotSourceDataDirAlias) + + data, err := os.ReadFile(snapshotStatePath(snapshotDir)) + require.NoError(t, err) + assert.Equal(t, uint64(0), firecrackerSnapshotCRC64(data)) + assert.NotContains(t, string(data), sourceDir) + assert.Contains(t, string(data), filepath.Join(targetDir, "overlay.raw")) + assert.Contains(t, string(data), filepath.Join(targetDir, "vsock.sock")) +} + +func TestPrepareFork_FallsBackToAliasWhenSnapshotStatePathLengthDiffers(t *testing.T) { + starter := NewStarter() + tmp := t.TempDir() + sourceDir := filepath.Join(tmp, "source-short") + targetDir := filepath.Join(tmp, "target-much-longer") + snapshotDir := filepath.Join(targetDir, "snapshots", "snapshot-latest") + require.NoError(t, os.MkdirAll(sourceDir, 0755)) + require.NoError(t, os.MkdirAll(snapshotDir, 0755)) + require.NoError(t, saveRestoreMetadata(targetDir, nil)) + writeSnapshotStateForTest(t, snapshotStatePath(snapshotDir), []byte("drive="+filepath.Join(sourceDir, "overlay.raw"))) + + _, err := starter.PrepareFork(context.Background(), hypervisor.ForkPrepareRequest{ + SnapshotConfigPath: filepath.Join(snapshotDir, "config.json"), + SourceDataDir: sourceDir, + TargetDataDir: targetDir, + }) + require.NoError(t, err) + + meta, err := loadRestoreMetadata(targetDir) + require.NoError(t, err) + assert.Equal(t, sourceDir, meta.SnapshotSourceDataDir) + assert.False(t, meta.RetainSnapshotSourceDataDirAlias) + + data, err := os.ReadFile(snapshotStatePath(snapshotDir)) + require.NoError(t, err) + assert.Equal(t, uint64(0), firecrackerSnapshotCRC64(data)) + assert.Contains(t, string(data), sourceDir) + assert.NotContains(t, string(data), targetDir) +} + func TestPrepareFork_ReturnsSourceStatErrors(t *testing.T) { starter := NewStarter() tmp := t.TempDir() @@ -137,3 +204,11 @@ func TestPrepareFork_NetworkRewritePreservesRetainedAlias(t *testing.T) { assert.Equal(t, upstreamDir, meta.SnapshotSourceDataDir) assert.True(t, meta.RetainSnapshotSourceDataDirAlias) } + +func writeSnapshotStateForTest(t *testing.T, path string, body []byte) { + t.Helper() + data := make([]byte, len(body)+8) + copy(data, body) + binary.LittleEndian.PutUint64(data[len(data)-8:], firecrackerSnapshotCRC64(body)) + require.NoError(t, os.WriteFile(path, data, 0644)) +} diff --git a/lib/hypervisor/firecracker/process.go b/lib/hypervisor/firecracker/process.go index 64bddf69..2b0dfebd 100644 --- a/lib/hypervisor/firecracker/process.go +++ b/lib/hypervisor/firecracker/process.go @@ -8,7 +8,6 @@ import ( "os/exec" "path/filepath" "strings" - "sync" "syscall" "time" @@ -62,8 +61,6 @@ func WithUFFDClient(client UFFDClient) StarterOption { var _ hypervisor.VMStarter = (*Starter)(nil) -var snapshotSourceAliasMu sync.Mutex - func (s *Starter) SocketName() string { return "fc.sock" } @@ -158,13 +155,19 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, createdUFFDSession = resp.SessionID backend = snapshotMemBackend{BackendType: "Uffd", BackendPath: resp.UFFDSocketPath} } - err = func() error { - snapshotSourceAliasMu.Lock() - defer snapshotSourceAliasMu.Unlock() - return withSnapshotSourceDirAlias(meta, filepath.Dir(socketPath), func() error { - return hv.loadSnapshot(ctx, snapshotPath, meta.NetworkOverrides, resumeOnLoad, backend) - }) - }() + targetDataDir := filepath.Dir(socketPath) + loadSnapshot := func() error { + return hv.loadSnapshot(ctx, snapshotPath, meta.NetworkOverrides, resumeOnLoad, backend) + } + if needsSnapshotSourceDirAlias(meta, targetDataDir) { + err = func() error { + unlockAlias := hypervisor.LockSnapshotSourceAliasMutation() + defer unlockAlias() + return withSnapshotSourceDirAlias(meta, targetDataDir, loadSnapshot) + }() + } else { + err = loadSnapshot() + } if err != nil { if createdUFFDSession != "" { _ = s.uffd.CloseSession(context.Background(), createdUFFDSession) @@ -247,6 +250,15 @@ func withSnapshotSourceDirAlias(meta *restoreMetadata, targetDataDir string, run return nil } +func needsSnapshotSourceDirAlias(meta *restoreMetadata, targetDataDir string) bool { + if meta == nil || meta.SnapshotSourceDataDir == "" { + return false + } + sourceDataDir := filepath.Clean(meta.SnapshotSourceDataDir) + targetDataDir = filepath.Clean(targetDataDir) + return sourceDataDir != targetDataDir +} + func (s *Starter) startProcess(_ context.Context, p *paths.Paths, version string, socketPath string) (int, error) { binaryPath, err := s.GetBinaryPath(p, version) if err != nil { diff --git a/lib/hypervisor/firecracker/snapshot_state.go b/lib/hypervisor/firecracker/snapshot_state.go new file mode 100644 index 00000000..77abd5c1 --- /dev/null +++ b/lib/hypervisor/firecracker/snapshot_state.go @@ -0,0 +1,108 @@ +package firecracker + +import ( + "bytes" + "encoding/binary" + "fmt" + "os" + "path/filepath" +) + +const firecrackerSnapshotCRC64JonesReversedPoly uint64 = 0x95ac9329ac4bc9b5 + +type snapshotStatePathRewrite struct { + Source string + Target string +} + +func rewriteSnapshotStatePathsForFork(statePath string, rewrites []snapshotStatePathRewrite) (bool, error) { + if len(rewrites) == 0 { + return false, nil + } + + data, err := os.ReadFile(statePath) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, fmt.Errorf("read firecracker snapshot state: %w", err) + } + if len(data) < 8 { + return false, fmt.Errorf("firecracker snapshot state is too short") + } + if firecrackerSnapshotCRC64(data) != 0 { + return false, fmt.Errorf("firecracker snapshot state CRC64 validation failed before path rewrite") + } + + body := append([]byte(nil), data[:len(data)-8]...) + replaced := 0 + targetsBySource := make(map[string][]string) + sources := make([]string, 0, len(rewrites)) + seen := make(map[string]struct{}, len(rewrites)) + for _, rewrite := range rewrites { + source := filepath.Clean(rewrite.Source) + target := filepath.Clean(rewrite.Target) + if source == "." || target == "." || source == target { + continue + } + key := source + "\x00" + target + if _, ok := seen[key]; ok { + continue + } + seen[key] = struct{}{} + if _, ok := targetsBySource[source]; !ok { + sources = append(sources, source) + } + targetsBySource[source] = append(targetsBySource[source], target) + } + + for _, source := range sources { + sourceBytes := []byte(source) + count := bytes.Count(body, sourceBytes) + if count == 0 { + continue + } + var targetBytes []byte + for _, target := range targetsBySource[source] { + candidate := []byte(target) + if len(sourceBytes) == len(candidate) { + targetBytes = candidate + break + } + } + if len(targetBytes) == 0 { + return false, nil + } + body = bytes.ReplaceAll(body, sourceBytes, targetBytes) + replaced += count + } + if replaced == 0 { + return false, nil + } + + out := make([]byte, len(body)+8) + copy(out, body) + binary.LittleEndian.PutUint64(out[len(out)-8:], firecrackerSnapshotCRC64(body)) + if firecrackerSnapshotCRC64(out) != 0 { + return false, fmt.Errorf("firecracker snapshot state CRC64 validation failed after path rewrite") + } + if err := os.WriteFile(statePath, out, 0644); err != nil { + return false, fmt.Errorf("write firecracker snapshot state: %w", err) + } + return true, nil +} + +func firecrackerSnapshotCRC64(data []byte) uint64 { + crc := uint64(0) + for _, b := range data { + crc ^= uint64(b) + for i := 0; i < 8; i++ { + if crc&1 != 0 { + crc = (crc >> 1) ^ firecrackerSnapshotCRC64JonesReversedPoly + } else { + crc >>= 1 + } + } + } + return crc +} diff --git a/lib/hypervisor/snapshot_alias_lock.go b/lib/hypervisor/snapshot_alias_lock.go new file mode 100644 index 00000000..73ba5927 --- /dev/null +++ b/lib/hypervisor/snapshot_alias_lock.go @@ -0,0 +1,20 @@ +package hypervisor + +import "sync" + +var snapshotSourceAliasMu sync.RWMutex + +// LockSnapshotSourceAliasMutation serializes the short Firecracker restore +// window where a snapshotted source guest directory is temporarily replaced by +// a symlink to the fork being restored. +func LockSnapshotSourceAliasMutation() func() { + snapshotSourceAliasMu.Lock() + return snapshotSourceAliasMu.Unlock +} + +// LockSnapshotSourceAliasReaders blocks source-directory readers while a +// snapshot source alias mutation is active. +func LockSnapshotSourceAliasReaders() func() { + snapshotSourceAliasMu.RLock() + return snapshotSourceAliasMu.RUnlock +} diff --git a/lib/instances/fork.go b/lib/instances/fork.go index 308d94dc..40eb79da 100644 --- a/lib/instances/fork.go +++ b/lib/instances/fork.go @@ -253,17 +253,25 @@ func (m *manager) forkInstanceFromStoppedOrStandby(ctx context.Context, id strin }) defer cu.Clean() - if source.State == StateStandby { - if err := m.ensureSnapshotMemoryReady(ctx, m.paths.InstanceSnapshotLatest(id), m.snapshotJobKeyForInstance(id), stored.HypervisorType); err != nil { - return nil, fmt.Errorf("prepare standby snapshot for fork: %w", err) + if err := func() error { + unlockAliasReaders := hypervisor.LockSnapshotSourceAliasReaders() + defer unlockAliasReaders() + + if source.State == StateStandby { + if err := m.ensureSnapshotMemoryReady(ctx, m.paths.InstanceSnapshotLatest(id), m.snapshotJobKeyForInstance(id), stored.HypervisorType); err != nil { + return fmt.Errorf("prepare standby snapshot for fork: %w", err) + } } - } - if err := forkvm.CopyGuestDirectory(srcDir, dstDir); err != nil { - if errors.Is(err, forkvm.ErrSparseCopyUnsupported) { - return nil, fmt.Errorf("fork requires sparse-capable filesystem (SEEK_DATA/SEEK_HOLE unsupported): %w", err) + if err := forkvm.CopyGuestDirectory(srcDir, dstDir); err != nil { + if errors.Is(err, forkvm.ErrSparseCopyUnsupported) { + return fmt.Errorf("fork requires sparse-capable filesystem (SEEK_DATA/SEEK_HOLE unsupported): %w", err) + } + return fmt.Errorf("clone guest directory: %w", err) } - return nil, fmt.Errorf("clone guest directory: %w", err) + return nil + }(); err != nil { + return nil, err } starter, err := m.getVMStarter(stored.HypervisorType) @@ -324,15 +332,21 @@ func (m *manager) forkInstanceFromStoppedOrStandby(ctx context.Context, id strin if forkMeta.NetworkEnabled { netCfg = &hypervisor.ForkNetworkConfig{TAPDevice: network.GenerateTAPName(forkID)} } - if _, err := starter.PrepareFork(ctx, hypervisor.ForkPrepareRequest{ - SnapshotConfigPath: snapshotConfigPath, - SourceDataDir: stored.DataDir, - TargetDataDir: forkMeta.DataDir, - VsockCID: forkMeta.VsockCID, - VsockSocket: forkMeta.VsockSocket, - SerialLogPath: m.paths.InstanceAppLog(forkID), - Network: netCfg, - }); err != nil { + err := func() error { + unlockAliasReaders := hypervisor.LockSnapshotSourceAliasReaders() + defer unlockAliasReaders() + _, err := starter.PrepareFork(ctx, hypervisor.ForkPrepareRequest{ + SnapshotConfigPath: snapshotConfigPath, + SourceDataDir: stored.DataDir, + TargetDataDir: forkMeta.DataDir, + VsockCID: forkMeta.VsockCID, + VsockSocket: forkMeta.VsockSocket, + SerialLogPath: m.paths.InstanceAppLog(forkID), + Network: netCfg, + }) + return err + }() + if err != nil { if errors.Is(err, hypervisor.ErrNotSupported) { return nil, fmt.Errorf("%w: fork is not supported for hypervisor %s", ErrNotSupported, stored.HypervisorType) } diff --git a/lib/instances/fork_test.go b/lib/instances/fork_test.go index 32764063..58d0bc1c 100644 --- a/lib/instances/fork_test.go +++ b/lib/instances/fork_test.go @@ -137,6 +137,116 @@ func TestCleanupForkInstanceOnError(t *testing.T) { assert.ErrorIs(t, err, ErrNotFound) } +func TestGetInstanceWaitsDuringSnapshotSourceAlias(t *testing.T) { + manager, _ := setupTestManager(t) + ctx := context.Background() + + sourceID := "fork-alias-source" + aliasID := "fork-alias-child" + now := time.Now() + for _, meta := range []*metadata{ + {StoredMetadata: StoredMetadata{ + Id: sourceID, + Name: sourceID, + Image: integrationTestImageRef(t, "docker.io/library/alpine:latest"), + CreatedAt: now, + StoppedAt: &now, + HypervisorType: hypervisor.TypeFirecracker, + HypervisorVersion: "test", + SocketPath: paths.New(manager.paths.DataDir()).InstanceSocket(sourceID, "fc.sock"), + DataDir: paths.New(manager.paths.DataDir()).InstanceDir(sourceID), + VsockCID: 42, + VsockSocket: paths.New(manager.paths.DataDir()).InstanceVsockSocket(sourceID), + }}, + {StoredMetadata: StoredMetadata{ + Id: aliasID, + Name: aliasID, + Image: integrationTestImageRef(t, "docker.io/library/alpine:latest"), + CreatedAt: now, + HypervisorType: hypervisor.TypeFirecracker, + HypervisorVersion: "test", + SocketPath: paths.New(manager.paths.DataDir()).InstanceSocket(aliasID, "fc.sock"), + DataDir: paths.New(manager.paths.DataDir()).InstanceDir(aliasID), + VsockCID: 43, + VsockSocket: paths.New(manager.paths.DataDir()).InstanceVsockSocket(aliasID), + }}, + } { + require.NoError(t, manager.ensureDirectories(meta.Id)) + require.NoError(t, manager.saveMetadata(meta)) + } + + sourceDir := manager.paths.InstanceDir(sourceID) + childDir := manager.paths.InstanceDir(aliasID) + backupDir := sourceDir + ".bak" + + unlockAlias := hypervisor.LockSnapshotSourceAliasMutation() + aliasLocked := true + defer func() { + if aliasLocked { + unlockAlias() + } + }() + require.NoError(t, os.Rename(sourceDir, backupDir)) + require.NoError(t, os.Symlink(childDir, sourceDir)) + + type getResult struct { + inst *Instance + err error + } + done := make(chan getResult, 1) + go func() { + inst, err := manager.GetInstance(ctx, sourceID) + done <- getResult{inst: inst, err: err} + }() + + select { + case got := <-done: + t.Fatalf("GetInstance returned during source alias mutation: inst=%v err=%v", got.inst, got.err) + case <-time.After(25 * time.Millisecond): + } + + require.NoError(t, os.Remove(sourceDir)) + require.NoError(t, os.Rename(backupDir, sourceDir)) + unlockAlias() + aliasLocked = false + + got := <-done + require.NoError(t, got.err) + require.NotNil(t, got.inst) + assert.Equal(t, sourceID, got.inst.Id) +} + +func TestForkInstanceStoppedSourceUsesReadLock(t *testing.T) { + manager, _ := setupTestManager(t) + ctx := context.Background() + + sourceID := "fork-stopped-read-lock-source" + createStoppedSnapshotSourceFixture(t, manager, sourceID, sourceID, hypervisor.TypeQEMU) + + sourceLock := manager.getInstanceLock(sourceID) + sourceLock.RLock() + defer sourceLock.RUnlock() + + type forkResult struct { + inst *Instance + err error + } + done := make(chan forkResult, 1) + go func() { + inst, err := manager.ForkInstance(ctx, sourceID, ForkInstanceRequest{Name: "fork-stopped-read-lock-copy"}) + done <- forkResult{inst: inst, err: err} + }() + + select { + case got := <-done: + require.NoError(t, got.err) + require.NotNil(t, got.inst) + assert.Equal(t, StateStopped, got.inst.State) + case <-time.After(250 * time.Millisecond): + t.Fatal("ForkInstance blocked behind a source read lock") + } +} + func TestForkInstance_CleansUpOnTargetTransitionError(t *testing.T) { t.Parallel() manager, _ := setupTestManager(t) diff --git a/lib/instances/manager.go b/lib/instances/manager.go index 599dee75..2915c942 100644 --- a/lib/instances/manager.go +++ b/lib/instances/manager.go @@ -415,9 +415,35 @@ func (m *manager) DeleteSnapshot(ctx context.Context, snapshotID string) error { // ForkInstance creates a forked copy of an instance. func (m *manager) ForkInstance(ctx context.Context, id string, req ForkInstanceRequest) (*Instance, error) { lock := m.getInstanceLock(id) - lock.Lock() - forked, targetState, err := m.forkInstance(ctx, id, req) - lock.Unlock() + useReadLock := false + lock.RLock() + if meta, err := m.loadMetadata(id); err == nil { + source := m.toInstance(ctx, meta) + useReadLock = source.State == StateStopped || source.State == StateStandby + } + lock.RUnlock() + + var forked *Instance + var targetState State + var err error + if useReadLock { + lock.RLock() + if meta, loadErr := m.loadMetadata(id); loadErr == nil { + source := m.toInstance(ctx, meta) + useReadLock = source.State == StateStopped || source.State == StateStandby + } else { + useReadLock = false + } + if useReadLock { + forked, targetState, err = m.forkInstance(ctx, id, req) + } + lock.RUnlock() + } + if !useReadLock { + lock.Lock() + forked, targetState, err = m.forkInstance(ctx, id, req) + lock.Unlock() + } if err != nil { return nil, err } diff --git a/lib/instances/snapshot.go b/lib/instances/snapshot.go index 606ec8f6..54dec7b1 100644 --- a/lib/instances/snapshot.go +++ b/lib/instances/snapshot.go @@ -412,15 +412,23 @@ func (m *manager) forkSnapshot(ctx context.Context, snapshotID string, req ForkS if target != nil && target.State == compressionJobStateRunning { m.recordSnapshotCompressionPreemption(ctx, snapshotCompressionPreemptionForkSnapshot, target.Target) } - if err := m.ensureSnapshotMemoryReady(ctx, m.paths.SnapshotGuestDir(snapshotID), "", rec.StoredMetadata.HypervisorType); err != nil { - return nil, fmt.Errorf("prepare snapshot memory for fork: %w", err) - } + if err := func() error { + unlockAliasReaders := hypervisor.LockSnapshotSourceAliasReaders() + defer unlockAliasReaders() - if err := forkvm.CopyGuestDirectory(m.paths.SnapshotGuestDir(snapshotID), dstDir); err != nil { - if errors.Is(err, forkvm.ErrSparseCopyUnsupported) { - return nil, fmt.Errorf("fork from snapshot requires sparse-capable filesystem (SEEK_DATA/SEEK_HOLE unsupported): %w", err) + if err := m.ensureSnapshotMemoryReady(ctx, m.paths.SnapshotGuestDir(snapshotID), "", rec.StoredMetadata.HypervisorType); err != nil { + return fmt.Errorf("prepare snapshot memory for fork: %w", err) + } + + if err := forkvm.CopyGuestDirectory(m.paths.SnapshotGuestDir(snapshotID), dstDir); err != nil { + if errors.Is(err, forkvm.ErrSparseCopyUnsupported) { + return fmt.Errorf("fork from snapshot requires sparse-capable filesystem (SEEK_DATA/SEEK_HOLE unsupported): %w", err) + } + return fmt.Errorf("clone snapshot payload: %w", err) } - return nil, fmt.Errorf("clone snapshot payload: %w", err) + return nil + }(); err != nil { + return nil, err } starter, err := m.getVMStarter(targetHypervisor) @@ -470,15 +478,21 @@ func (m *manager) forkSnapshot(ctx context.Context, snapshotID string, req ForkS if forkMeta.NetworkEnabled { netCfg = &hypervisor.ForkNetworkConfig{TAPDevice: network.GenerateTAPName(forkID)} } - if _, err := starter.PrepareFork(ctx, hypervisor.ForkPrepareRequest{ - SnapshotConfigPath: m.paths.InstanceSnapshotConfig(forkID), - SourceDataDir: rec.StoredMetadata.DataDir, - TargetDataDir: forkMeta.DataDir, - VsockCID: forkMeta.VsockCID, - VsockSocket: forkMeta.VsockSocket, - SerialLogPath: m.paths.InstanceAppLog(forkID), - Network: netCfg, - }); err != nil { + err := func() error { + unlockAliasReaders := hypervisor.LockSnapshotSourceAliasReaders() + defer unlockAliasReaders() + _, err := starter.PrepareFork(ctx, hypervisor.ForkPrepareRequest{ + SnapshotConfigPath: m.paths.InstanceSnapshotConfig(forkID), + SourceDataDir: rec.StoredMetadata.DataDir, + TargetDataDir: forkMeta.DataDir, + VsockCID: forkMeta.VsockCID, + VsockSocket: forkMeta.VsockSocket, + SerialLogPath: m.paths.InstanceAppLog(forkID), + Network: netCfg, + }) + return err + }() + if err != nil { if errors.Is(err, hypervisor.ErrNotSupported) { return nil, fmt.Errorf("%w: snapshot fork is not supported for hypervisor %s", ErrNotSupported, targetHypervisor) } @@ -504,7 +518,12 @@ func (m *manager) forkSnapshot(ctx context.Context, snapshotID string, req ForkS } func (m *manager) copySnapshotPayload(sourceInstanceID, snapshotGuestDir string) error { - if err := forkvm.CopyGuestDirectory(m.paths.InstanceDir(sourceInstanceID), snapshotGuestDir); err != nil { + err := func() error { + unlockAliasReaders := hypervisor.LockSnapshotSourceAliasReaders() + defer unlockAliasReaders() + return forkvm.CopyGuestDirectory(m.paths.InstanceDir(sourceInstanceID), snapshotGuestDir) + }() + if err != nil { if errors.Is(err, forkvm.ErrSparseCopyUnsupported) { return fmt.Errorf("snapshot requires sparse-capable filesystem (SEEK_DATA/SEEK_HOLE unsupported): %w", err) } diff --git a/lib/instances/storage.go b/lib/instances/storage.go index 776c6124..f33a5962 100644 --- a/lib/instances/storage.go +++ b/lib/instances/storage.go @@ -11,6 +11,7 @@ import ( "github.com/kernel/hypeman/lib/autostandby" "github.com/kernel/hypeman/lib/healthcheck" + "github.com/kernel/hypeman/lib/hypervisor" "github.com/kernel/hypeman/lib/images" ) @@ -61,6 +62,9 @@ func (m *manager) ensureDirectories(id string) error { // loadMetadata loads instance metadata from disk func (m *manager) loadMetadata(id string) (*metadata, error) { + unlockAliasReaders := hypervisor.LockSnapshotSourceAliasReaders() + defer unlockAliasReaders() + metaPath := m.paths.InstanceMetadata(id) data, err := os.ReadFile(metaPath) @@ -81,6 +85,9 @@ func (m *manager) loadMetadata(id string) (*metadata, error) { // saveMetadata saves instance metadata to disk func (m *manager) saveMetadata(meta *metadata) error { + unlockAliasReaders := hypervisor.LockSnapshotSourceAliasReaders() + defer unlockAliasReaders() + metaPath := m.paths.InstanceMetadata(meta.Id) data, err := json.MarshalIndent(meta, "", " ") From 620f46c824bad788a5f0ce4fed7107dddda07d48 Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Sun, 31 May 2026 23:40:53 -0400 Subject: [PATCH 2/3] Trace network allocation substeps --- lib/network/allocate.go | 96 ++++++++++++++++++++++++++---- lib/network/bridge_linux.go | 114 ++++++++++++++++++++++++++++++++---- lib/network/derive.go | 17 +++++- lib/network/tracing.go | 36 ++++++++++++ 4 files changed, 241 insertions(+), 22 deletions(-) create mode 100644 lib/network/tracing.go diff --git a/lib/network/allocate.go b/lib/network/allocate.go index 8f6badfc..6eb27b95 100644 --- a/lib/network/allocate.go +++ b/lib/network/allocate.go @@ -11,6 +11,7 @@ import ( "time" "github.com/kernel/hypeman/lib/logger" + "go.opentelemetry.io/otel/attribute" ) // CreateAllocation allocates IP/MAC/TAP for instance on the default network @@ -19,7 +20,12 @@ func (m *manager) CreateAllocation(ctx context.Context, req AllocateRequest) (*N // Resolve bridge/default network before taking allocation lock so // self-heal retries don't block other allocation/release operations. - network, err := m.getOrInitDefaultNetwork(ctx) + networkCtx, networkSpanEnd := startNetworkStep(ctx, "network.get_default_network", + attribute.String("operation", "get_default_network"), + attribute.String("instance_id", req.InstanceID), + ) + network, err := m.getOrInitDefaultNetwork(networkCtx) + networkSpanEnd(err) if err != nil { return nil, err } @@ -27,31 +33,49 @@ func (m *manager) CreateAllocation(ctx context.Context, req AllocateRequest) (*N // Acquire lock to prevent concurrent allocations from: // 1. Picking the same IP address // 2. Creating duplicate instance names + waitCtx, waitSpanEnd := startNetworkStep(ctx, "network.create_allocation.wait_for_mutex", + attribute.String("operation", "wait_for_mutex"), + attribute.String("instance_id", req.InstanceID), + ) m.mu.Lock() + waitSpanEnd(nil) defer m.mu.Unlock() + lockedCtx, lockedSpanEnd := startNetworkStep(waitCtx, "network.create_allocation.locked", + attribute.String("operation", "create_allocation_locked"), + attribute.String("instance_id", req.InstanceID), + ) + var lockedErr error + defer func() { + lockedSpanEnd(lockedErr) + }() + // 1. Check name uniqueness (exclude current instance to allow restarts) - exists, err := m.NameExists(ctx, req.InstanceName, req.InstanceID) + exists, err := m.NameExists(lockedCtx, req.InstanceName, req.InstanceID) if err != nil { + lockedErr = err return nil, fmt.Errorf("check name exists: %w", err) } if exists { - return nil, fmt.Errorf("%w: instance name '%s' already exists, can't assign into same network: %s", + lockedErr = fmt.Errorf("%w: instance name '%s' already exists, can't assign into same network: %s", ErrNameExists, req.InstanceName, network.Name) + return nil, lockedErr } // 2. Allocate random available IP // Random selection reduces predictability and helps distribute IPs across the subnet. // This is especially useful for large /16 networks and reduces conflicts when // moving standby VMs across hosts. - ip, err := m.allocateNextIP(ctx, network.Subnet) + ip, err := m.allocateNextIP(lockedCtx, network.Subnet) if err != nil { + lockedErr = err return nil, fmt.Errorf("allocate IP: %w", err) } // 3. Generate unused MAC (02:00:00:... format - locally administered) - mac, err := m.allocateUniqueMAC(ctx) + mac, err := m.allocateUniqueMAC(lockedCtx) if err != nil { + lockedErr = err return nil, fmt.Errorf("allocate MAC: %w", err) } @@ -59,21 +83,39 @@ func (m *manager) CreateAllocation(ctx context.Context, req AllocateRequest) (*N tap := GenerateTAPName(req.InstanceID) // 5. Create TAP device with bidirectional rate limiting - classID, err := m.createTAPDevice(ctx, tap, network.Bridge, network.Isolated, req.DownloadBps, req.UploadBps, req.UploadCeilBps) + tapCtx, tapSpanEnd := startNetworkStep(lockedCtx, "network.create_tap", + attribute.String("operation", "create_tap"), + attribute.String("instance_id", req.InstanceID), + attribute.String("tap", tap), + attribute.Bool("isolated", network.Isolated), + attribute.Bool("download_rate_limit", req.DownloadBps > 0), + attribute.Bool("upload_rate_limit", req.UploadBps > 0), + ) + classID, err := m.createTAPDevice(tapCtx, tap, network.Bridge, network.Isolated, req.DownloadBps, req.UploadBps, req.UploadCeilBps) + tapSpanEnd(err) if err != nil { + lockedErr = err return nil, fmt.Errorf("create TAP device: %w", err) } m.recordTAPOperation(ctx, "create") // Persist assigned tc class ID so removal uses the correct ID after collisions. // Clear any stale file when no rate limiting was applied. + _, saveClassSpanEnd := startNetworkStep(lockedCtx, "network.save_class_id", + attribute.String("operation", "save_class_id"), + attribute.String("instance_id", req.InstanceID), + attribute.Bool("has_class_id", classID != ""), + ) if classID != "" { if err := m.saveClassID(req.InstanceID, classID); err != nil { + saveClassSpanEnd(err) + lockedErr = err return nil, fmt.Errorf("save class ID: %w", err) } } else { m.clearClassID(req.InstanceID) } + saveClassSpanEnd(nil) log.InfoContext(ctx, "allocated network", "instance_id", req.InstanceID, @@ -225,15 +267,30 @@ func (m *manager) getOrInitDefaultNetwork(ctx context.Context) (*Network, error) // allocateNextIP picks a random available IP in the subnet // Retries up to 5 times if conflicts occur func (m *manager) allocateNextIP(ctx context.Context, subnet string) (string, error) { + chooseCtx, chooseSpanEnd := startNetworkStep(ctx, "network.allocate_ip", + attribute.String("operation", "allocate_ip"), + ) + var chooseErr error + defer func() { + chooseSpanEnd(chooseErr) + }() + // Parse subnet _, ipNet, err := net.ParseCIDR(subnet) if err != nil { + chooseErr = err return "", fmt.Errorf("parse subnet: %w", err) } // Get all currently allocated IPs - allocations, err := m.ListAllocations(ctx) + listCtx, listSpanEnd := startNetworkStep(chooseCtx, "network.list_allocations", + attribute.String("operation", "list_allocations"), + attribute.String("caller", "allocate_ip"), + ) + allocations, err := m.ListAllocations(listCtx) + listSpanEnd(err) if err != nil { + chooseErr = err return "", fmt.Errorf("list allocations: %w", err) } @@ -286,7 +343,8 @@ func (m *manager) allocateNextIP(ctx context.Context, subnet string) (string, er } } - return "", fmt.Errorf("no available IPs in subnet %s after %d random attempts and full scan", subnet, maxRetries) + chooseErr = fmt.Errorf("no available IPs in subnet %s after %d random attempts and full scan", subnet, maxRetries) + return "", chooseErr } // incrementIP increments IP address by n @@ -319,8 +377,22 @@ const ( // allocateUniqueMAC picks an unused locally administered MAC address. func (m *manager) allocateUniqueMAC(ctx context.Context) (string, error) { - allocations, err := m.ListAllocations(ctx) + chooseCtx, chooseSpanEnd := startNetworkStep(ctx, "network.allocate_mac", + attribute.String("operation", "allocate_mac"), + ) + var chooseErr error + defer func() { + chooseSpanEnd(chooseErr) + }() + + listCtx, listSpanEnd := startNetworkStep(chooseCtx, "network.list_allocations", + attribute.String("operation", "list_allocations"), + attribute.String("caller", "allocate_mac"), + ) + allocations, err := m.ListAllocations(listCtx) + listSpanEnd(err) if err != nil { + chooseErr = err return "", fmt.Errorf("list allocations: %w", err) } @@ -332,7 +404,11 @@ func (m *manager) allocateUniqueMAC(ctx context.Context) (string, error) { } } - return allocateUniqueMACFromSet(usedMACs, generateMAC) + mac, err := allocateUniqueMACFromSet(usedMACs, generateMAC) + if err != nil { + chooseErr = err + } + return mac, err } func allocateUniqueMACFromSet(usedMACs map[string]bool, generate func() (string, error)) (string, error) { diff --git a/lib/network/bridge_linux.go b/lib/network/bridge_linux.go index b05a595d..ddd7a15b 100644 --- a/lib/network/bridge_linux.go +++ b/lib/network/bridge_linux.go @@ -16,6 +16,7 @@ import ( "github.com/kernel/hypeman/lib/logger" "github.com/vishvananda/netlink" + "go.opentelemetry.io/otel/attribute" "golang.org/x/sys/unix" ) @@ -500,9 +501,21 @@ func (m *manager) lastHypemanForwardRulePosition() int { // Returns the tc class ID actually assigned (empty if no upload rate limiting). func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName string, isolated bool, downloadBps, uploadBps, uploadCeilBps int64) (string, error) { // 1. Check if TAP already exists - if _, err := netlink.LinkByName(tapName); err == nil { + _, linkLookupEnd := startNetworkStep(ctx, "network.create_tap.link_lookup_existing", + attribute.String("operation", "link_lookup_existing"), + attribute.String("tap", tapName), + ) + _, err := netlink.LinkByName(tapName) + linkLookupEnd(nil) + if err == nil { // TAP already exists, delete it first - if err := m.deleteTAPDevice(tapName, ""); err != nil { + _, deleteEnd := startNetworkStep(ctx, "network.create_tap.delete_existing", + attribute.String("operation", "delete_existing"), + attribute.String("tap", tapName), + ) + err := m.deleteTAPDevice(tapName, "") + deleteEnd(err) + if err != nil { return "", fmt.Errorf("delete existing TAP: %w", err) } } @@ -521,32 +534,65 @@ func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName strin Group: uint32(gid), } - if err := netlink.LinkAdd(tap); err != nil { + _, linkAddEnd := startNetworkStep(ctx, "network.create_tap.link_add", + attribute.String("operation", "link_add"), + attribute.String("tap", tapName), + ) + err = netlink.LinkAdd(tap) + linkAddEnd(err) + if err != nil { return "", fmt.Errorf("create TAP device: %w", err) } // 3. Set TAP up + _, linkByNameEnd := startNetworkStep(ctx, "network.create_tap.link_lookup_created", + attribute.String("operation", "link_lookup_created"), + attribute.String("tap", tapName), + ) tapLink, err := netlink.LinkByName(tapName) + linkByNameEnd(err) if err != nil { return "", fmt.Errorf("get TAP link: %w", err) } - if err := netlink.LinkSetUp(tapLink); err != nil { + _, setUpEnd := startNetworkStep(ctx, "network.create_tap.link_set_up", + attribute.String("operation", "link_set_up"), + attribute.String("tap", tapName), + ) + err = netlink.LinkSetUp(tapLink) + setUpEnd(err) + if err != nil { return "", fmt.Errorf("set TAP up: %w", err) } // 4. Attach TAP to bridge + _, bridgeLookupEnd := startNetworkStep(ctx, "network.create_tap.link_lookup_bridge", + attribute.String("operation", "link_lookup_bridge"), + attribute.String("bridge", bridgeName), + ) bridge, err := netlink.LinkByName(bridgeName) + bridgeLookupEnd(err) if err != nil { return "", fmt.Errorf("get bridge: %w", err) } - if err := netlink.LinkSetMaster(tapLink, bridge); err != nil { + _, setMasterEnd := startNetworkStep(ctx, "network.create_tap.link_set_master", + attribute.String("operation", "link_set_master"), + attribute.String("tap", tapName), + attribute.String("bridge", bridgeName), + ) + err = netlink.LinkSetMaster(tapLink, bridge) + setMasterEnd(err) + if err != nil { return "", fmt.Errorf("attach TAP to bridge: %w", err) } // 5. Enable port isolation so isolated TAPs can't directly talk to each other (requires kernel support and capabilities) if isolated { + _, isolationEnd := startNetworkStep(ctx, "network.create_tap.set_isolation", + attribute.String("operation", "set_isolation"), + attribute.String("tap", tapName), + ) // Use shell command for bridge_slave isolated flag // netlink library doesn't expose this flag yet cmd := exec.Command("ip", "link", "set", tapName, "type", "bridge_slave", "isolated", "on") @@ -555,6 +601,7 @@ func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName strin AmbientCaps: []uintptr{unix.CAP_NET_ADMIN}, } output, err := cmd.CombinedOutput() + isolationEnd(err) if err != nil { return "", fmt.Errorf("set isolation mode: %w (output: %s)", err, string(output)) } @@ -562,7 +609,13 @@ func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName strin // 6. Apply download rate limiting (TBF on TAP egress) if downloadBps > 0 { - if err := m.applyDownloadRateLimit(tapName, downloadBps); err != nil { + _, downloadEnd := startNetworkStep(ctx, "network.create_tap.download_rate_limit", + attribute.String("operation", "download_rate_limit"), + attribute.String("tap", tapName), + ) + err := m.applyDownloadRateLimit(ctx, tapName, downloadBps) + downloadEnd(err) + if err != nil { return "", fmt.Errorf("apply download rate limit: %w", err) } } @@ -571,7 +624,13 @@ func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName strin var classID string if uploadBps > 0 { var err error - classID, err = m.addVMClass(ctx, bridgeName, tapName, uploadBps, uploadCeilBps) + uploadCtx, uploadEnd := startNetworkStep(ctx, "network.create_tap.upload_rate_limit", + attribute.String("operation", "upload_rate_limit"), + attribute.String("tap", tapName), + attribute.String("bridge", bridgeName), + ) + classID, err = m.addVMClass(uploadCtx, bridgeName, tapName, uploadBps, uploadCeilBps) + uploadEnd(err) if err != nil { return "", fmt.Errorf("apply upload rate limit: %w", err) } @@ -581,7 +640,7 @@ func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName strin } // applyDownloadRateLimit applies download (external→VM) rate limiting using TBF on TAP egress. -func (m *manager) applyDownloadRateLimit(tapName string, rateLimitBps int64) error { +func (m *manager) applyDownloadRateLimit(ctx context.Context, tapName string, rateLimitBps int64) error { rateStr := formatTcRate(rateLimitBps) // Use Token Bucket Filter (tbf) for download shaping @@ -601,7 +660,12 @@ func (m *manager) applyDownloadRateLimit(tapName string, rateLimitBps int64) err cmd.SysProcAttr = &syscall.SysProcAttr{ AmbientCaps: []uintptr{unix.CAP_NET_ADMIN}, } + _, tcEnd := startNetworkStep(ctx, "network.create_tap.download_rate_limit.tc_qdisc_tbf", + attribute.String("operation", "tc_qdisc_tbf"), + attribute.String("tap", tapName), + ) output, err := cmd.CombinedOutput() + tcEnd(err) if err != nil { return fmt.Errorf("tc qdisc add tbf: %w (output: %s)", err, string(output)) } @@ -696,7 +760,15 @@ func (m *manager) addVMClass(ctx context.Context, bridgeName, tapName string, ra cmd.SysProcAttr = &syscall.SysProcAttr{ AmbientCaps: []uintptr{unix.CAP_NET_ADMIN}, } + _, classAddEnd := startNetworkStep(ctx, "network.create_tap.upload_rate_limit.tc_class_add", + attribute.String("operation", "tc_class_add"), + attribute.String("tap", tapName), + attribute.String("bridge", bridgeName), + attribute.String("class_id", fullClassID), + attribute.Int("attempt", attempt+1), + ) output, err := cmd.CombinedOutput() + classAddEnd(err) if err != nil { // Check for "File exists" collision (exit status 2). var exitErr *exec.ExitError @@ -724,9 +796,21 @@ func (m *manager) addVMClass(ctx context.Context, bridgeName, tapName string, ra qdiscCmd.SysProcAttr = &syscall.SysProcAttr{ AmbientCaps: []uintptr{unix.CAP_NET_ADMIN}, } - qdiscCmd.Run() // Best effort - + _, fqCodelEnd := startNetworkStep(ctx, "network.create_tap.upload_rate_limit.tc_qdisc_fq_codel", + attribute.String("operation", "tc_qdisc_fq_codel"), + attribute.String("tap", tapName), + attribute.String("bridge", bridgeName), + attribute.String("class_id", fullClassID), + ) + fqCodelErr := qdiscCmd.Run() // Best effort + fqCodelEnd(fqCodelErr) + + _, filterLinkEnd := startNetworkStep(ctx, "network.create_tap.upload_rate_limit.link_lookup_filter", + attribute.String("operation", "link_lookup_filter"), + attribute.String("tap", tapName), + ) tapLink, linkErr := netlink.LinkByName(tapName) + filterLinkEnd(linkErr) if linkErr != nil { return "", fmt.Errorf("get TAP link for filter: %w", linkErr) } @@ -739,7 +823,15 @@ func (m *manager) addVMClass(ctx context.Context, bridgeName, tapName string, ra filterCmd.SysProcAttr = &syscall.SysProcAttr{ AmbientCaps: []uintptr{unix.CAP_NET_ADMIN}, } - if output, filterErr := filterCmd.CombinedOutput(); filterErr != nil { + _, filterEnd := startNetworkStep(ctx, "network.create_tap.upload_rate_limit.tc_filter_add", + attribute.String("operation", "tc_filter_add"), + attribute.String("tap", tapName), + attribute.String("bridge", bridgeName), + attribute.String("class_id", fullClassID), + ) + output, filterErr := filterCmd.CombinedOutput() + filterEnd(filterErr) + if filterErr != nil { return "", fmt.Errorf("tc filter add: %w (output: %s)", filterErr, string(output)) } diff --git a/lib/network/derive.go b/lib/network/derive.go index 7970d03e..6fd51336 100644 --- a/lib/network/derive.go +++ b/lib/network/derive.go @@ -9,6 +9,7 @@ import ( "github.com/kernel/hypeman/lib/hypervisor" "github.com/kernel/hypeman/lib/logger" + "go.opentelemetry.io/otel/attribute" ) // instanceMetadata is the minimal metadata we need to derive allocations @@ -123,8 +124,22 @@ func (m *manager) ListAllocations(ctx context.Context) ([]Allocation, error) { // excludeInstanceID allows excluding a specific instance from the check (used when // starting an existing instance to avoid it conflicting with itself). func (m *manager) NameExists(ctx context.Context, name string, excludeInstanceID string) (bool, error) { - allocations, err := m.ListAllocations(ctx) + checkCtx, checkSpanEnd := startNetworkStep(ctx, "network.name_exists", + attribute.String("operation", "name_exists"), + ) + var checkErr error + defer func() { + checkSpanEnd(checkErr) + }() + + listCtx, listSpanEnd := startNetworkStep(checkCtx, "network.list_allocations", + attribute.String("operation", "list_allocations"), + attribute.String("caller", "name_exists"), + ) + allocations, err := m.ListAllocations(listCtx) + listSpanEnd(err) if err != nil { + checkErr = err return false, err } diff --git a/lib/network/tracing.go b/lib/network/tracing.go new file mode 100644 index 00000000..ca36b4d7 --- /dev/null +++ b/lib/network/tracing.go @@ -0,0 +1,36 @@ +package network + +import ( + "context" + + "github.com/kernel/hypeman/lib/hypervisor" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +func startNetworkStep(ctx context.Context, name string, attrs ...attribute.KeyValue) (context.Context, func(error)) { + inherited := hypervisor.TraceAttributesFromContext(ctx) + if len(inherited) > 0 { + merged := make([]attribute.KeyValue, 0, len(inherited)+len(attrs)) + merged = append(merged, inherited...) + merged = append(merged, attrs...) + attrs = merged + } + + opts := []trace.SpanStartOption(nil) + if len(attrs) > 0 { + opts = append(opts, trace.WithAttributes(attrs...)) + } + ctx, span := otel.Tracer("hypeman/network").Start(ctx, name, opts...) + return ctx, func(err error) { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetStatus(codes.Ok, "") + } + span.End() + } +} From b2227befec88e2bb19b52e1aa01721fce9f0192d Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Mon, 1 Jun 2026 00:05:32 -0400 Subject: [PATCH 3/3] Parallelize TAP setup and defer tc shaping --- lib/network/allocate.go | 346 +++++++++++++++++++++++++---------- lib/network/bridge_darwin.go | 10 +- lib/network/bridge_linux.go | 79 ++------ lib/network/derive.go | 77 +++++++- lib/network/manager.go | 53 +++++- lib/network/manager_test.go | 48 +++++ lib/network/tc_rate.go | 20 ++ 7 files changed, 460 insertions(+), 173 deletions(-) create mode 100644 lib/network/tc_rate.go diff --git a/lib/network/allocate.go b/lib/network/allocate.go index 6eb27b95..276039c6 100644 --- a/lib/network/allocate.go +++ b/lib/network/allocate.go @@ -30,116 +30,68 @@ func (m *manager) CreateAllocation(ctx context.Context, req AllocateRequest) (*N return nil, err } - // Acquire lock to prevent concurrent allocations from: - // 1. Picking the same IP address - // 2. Creating duplicate instance names + // Reserve network identity under a short lock, then create host devices + // outside it so concurrent forks do not queue behind TAP setup. waitCtx, waitSpanEnd := startNetworkStep(ctx, "network.create_allocation.wait_for_mutex", attribute.String("operation", "wait_for_mutex"), attribute.String("instance_id", req.InstanceID), ) m.mu.Lock() waitSpanEnd(nil) - defer m.mu.Unlock() lockedCtx, lockedSpanEnd := startNetworkStep(waitCtx, "network.create_allocation.locked", attribute.String("operation", "create_allocation_locked"), attribute.String("instance_id", req.InstanceID), ) - var lockedErr error - defer func() { - lockedSpanEnd(lockedErr) - }() - - // 1. Check name uniqueness (exclude current instance to allow restarts) - exists, err := m.NameExists(lockedCtx, req.InstanceName, req.InstanceID) - if err != nil { - lockedErr = err - return nil, fmt.Errorf("check name exists: %w", err) - } - if exists { - lockedErr = fmt.Errorf("%w: instance name '%s' already exists, can't assign into same network: %s", - ErrNameExists, req.InstanceName, network.Name) - return nil, lockedErr - } - - // 2. Allocate random available IP - // Random selection reduces predictability and helps distribute IPs across the subnet. - // This is especially useful for large /16 networks and reduces conflicts when - // moving standby VMs across hosts. - ip, err := m.allocateNextIP(lockedCtx, network.Subnet) + netConfig, err := m.reserveAllocationIdentityLocked(lockedCtx, network, req) + lockedSpanEnd(err) + m.mu.Unlock() if err != nil { - lockedErr = err - return nil, fmt.Errorf("allocate IP: %w", err) - } - - // 3. Generate unused MAC (02:00:00:... format - locally administered) - mac, err := m.allocateUniqueMAC(lockedCtx) - if err != nil { - lockedErr = err - return nil, fmt.Errorf("allocate MAC: %w", err) + return nil, err } - // 4. Generate TAP name (tap-{first8chars-of-id}) - tap := GenerateTAPName(req.InstanceID) - - // 5. Create TAP device with bidirectional rate limiting - tapCtx, tapSpanEnd := startNetworkStep(lockedCtx, "network.create_tap", + // Create the TAP synchronously because the VMM restore depends on it. + tapCtx, tapSpanEnd := startNetworkStep(ctx, "network.create_tap", attribute.String("operation", "create_tap"), attribute.String("instance_id", req.InstanceID), - attribute.String("tap", tap), + attribute.String("tap", netConfig.TAPDevice), attribute.Bool("isolated", network.Isolated), attribute.Bool("download_rate_limit", req.DownloadBps > 0), attribute.Bool("upload_rate_limit", req.UploadBps > 0), ) - classID, err := m.createTAPDevice(tapCtx, tap, network.Bridge, network.Isolated, req.DownloadBps, req.UploadBps, req.UploadCeilBps) + err = m.createTAPDevice(tapCtx, netConfig.TAPDevice, network.Bridge, network.Isolated) tapSpanEnd(err) if err != nil { - lockedErr = err + m.forgetPendingAllocation(req.InstanceID) + _ = m.deleteTAPDevice(netConfig.TAPDevice, "") return nil, fmt.Errorf("create TAP device: %w", err) } m.recordTAPOperation(ctx, "create") - // Persist assigned tc class ID so removal uses the correct ID after collisions. - // Clear any stale file when no rate limiting was applied. - _, saveClassSpanEnd := startNetworkStep(lockedCtx, "network.save_class_id", - attribute.String("operation", "save_class_id"), - attribute.String("instance_id", req.InstanceID), - attribute.Bool("has_class_id", classID != ""), - ) - if classID != "" { - if err := m.saveClassID(req.InstanceID, classID); err != nil { - saveClassSpanEnd(err) - lockedErr = err - return nil, fmt.Errorf("save class ID: %w", err) - } + if req.DownloadBps > 0 || req.UploadBps > 0 { + m.enqueueRateLimit(ctx, rateLimitRequest{ + instanceID: req.InstanceID, + tapName: netConfig.TAPDevice, + bridgeName: network.Bridge, + downloadBps: req.DownloadBps, + uploadBps: req.UploadBps, + uploadCeilBps: req.UploadCeilBps, + }) } else { m.clearClassID(req.InstanceID) } - saveClassSpanEnd(nil) log.InfoContext(ctx, "allocated network", "instance_id", req.InstanceID, "instance_name", req.InstanceName, "network", "default", - "ip", ip, - "mac", mac, - "tap", tap, + "ip", netConfig.IP, + "mac", netConfig.MAC, + "tap", netConfig.TAPDevice, "download_bps", req.DownloadBps, "upload_bps", req.UploadBps) - // 6. Calculate netmask from subnet - _, ipNet, _ := net.ParseCIDR(network.Subnet) - netmask := fmt.Sprintf("%d.%d.%d.%d", ipNet.Mask[0], ipNet.Mask[1], ipNet.Mask[2], ipNet.Mask[3]) - - // 7. Return config (will be used in CH VmConfig) - return &NetworkConfig{ - IP: ip, - MAC: mac, - Gateway: network.Gateway, - Netmask: netmask, - DNS: m.config.Network.DNSServer, - TAPDevice: tap, - }, nil + return netConfig, nil } // RecreateAllocation recreates TAP for restore from standby @@ -168,18 +120,21 @@ func (m *manager) RecreateAllocation(ctx context.Context, instanceID string, dow // 3. Recreate TAP device with same name and rate limits from instance metadata uploadCeilBps := uploadBps * int64(m.GetUploadBurstMultiplier()) - classID, err := m.createTAPDevice(ctx, alloc.TAPDevice, network.Bridge, network.Isolated, downloadBps, uploadBps, uploadCeilBps) - if err != nil { + if err := m.createTAPDevice(ctx, alloc.TAPDevice, network.Bridge, network.Isolated); err != nil { + _ = m.deleteTAPDevice(alloc.TAPDevice, alloc.ClassID) return fmt.Errorf("create TAP device: %w", err) } m.recordTAPOperation(ctx, "create") - // Persist assigned tc class ID so removal uses the correct ID after collisions. - // Clear any stale file when no rate limiting was applied. - if classID != "" { - if err := m.saveClassID(instanceID, classID); err != nil { - return fmt.Errorf("save class ID: %w", err) - } + if downloadBps > 0 || uploadBps > 0 { + m.enqueueRateLimit(ctx, rateLimitRequest{ + instanceID: instanceID, + tapName: alloc.TAPDevice, + bridgeName: network.Bridge, + downloadBps: downloadBps, + uploadBps: uploadBps, + uploadCeilBps: uploadCeilBps, + }) } else { m.clearClassID(instanceID) } @@ -194,6 +149,183 @@ func (m *manager) RecreateAllocation(ctx context.Context, instanceID string, dow return nil } +func (m *manager) reserveAllocationIdentityLocked(ctx context.Context, network *Network, req AllocateRequest) (*NetworkConfig, error) { + listCtx, listSpanEnd := startNetworkStep(ctx, "network.list_allocations", + attribute.String("operation", "list_allocations"), + attribute.String("caller", "create_allocation"), + ) + allocations, err := m.listAllocationsWithPendingLocked(listCtx) + listSpanEnd(err) + if err != nil { + return nil, fmt.Errorf("list allocations: %w", err) + } + + _, nameSpanEnd := startNetworkStep(ctx, "network.name_exists", + attribute.String("operation", "name_exists"), + attribute.String("instance_id", req.InstanceID), + ) + exists := nameExistsInAllocations(allocations, req.InstanceName, req.InstanceID) + nameSpanEnd(nil) + if exists { + return nil, fmt.Errorf("%w: instance name '%s' already exists, can't assign into same network: %s", + ErrNameExists, req.InstanceName, network.Name) + } + + _, ipSpanEnd := startNetworkStep(ctx, "network.allocate_ip", + attribute.String("operation", "allocate_ip"), + attribute.String("instance_id", req.InstanceID), + ) + ip, err := allocateNextIPFromAllocations(network.Subnet, allocations) + ipSpanEnd(err) + if err != nil { + return nil, fmt.Errorf("allocate IP: %w", err) + } + + _, macSpanEnd := startNetworkStep(ctx, "network.allocate_mac", + attribute.String("operation", "allocate_mac"), + attribute.String("instance_id", req.InstanceID), + ) + mac, err := allocateUniqueMACFromAllocations(allocations, generateMAC) + macSpanEnd(err) + if err != nil { + return nil, fmt.Errorf("allocate MAC: %w", err) + } + + tap := GenerateTAPName(req.InstanceID) + _, ipNet, _ := net.ParseCIDR(network.Subnet) + netmask := fmt.Sprintf("%d.%d.%d.%d", ipNet.Mask[0], ipNet.Mask[1], ipNet.Mask[2], ipNet.Mask[3]) + + m.rememberPendingAllocationLocked(Allocation{ + InstanceID: req.InstanceID, + InstanceName: req.InstanceName, + Network: "default", + IP: ip, + MAC: mac, + TAPDevice: tap, + Gateway: network.Gateway, + Netmask: netmask, + DNS: m.config.Network.DNSServer, + State: "pending", + }) + + return &NetworkConfig{ + IP: ip, + MAC: mac, + Gateway: network.Gateway, + Netmask: netmask, + DNS: m.config.Network.DNSServer, + TAPDevice: tap, + }, nil + +} + +type rateLimitRequest struct { + instanceID string + tapName string + bridgeName string + downloadBps int64 + uploadBps int64 + uploadCeilBps int64 +} + +func (m *manager) enqueueRateLimit(ctx context.Context, req rateLimitRequest) { + ctx = context.WithoutCancel(ctx) + go m.applyRateLimitAsync(ctx, req) +} + +func (m *manager) applyRateLimitAsync(ctx context.Context, req rateLimitRequest) { + log := logger.FromContext(ctx) + + waitCtx, waitSpanEnd := startNetworkStep(ctx, "network.rate_limit.wait_for_tc_mutex", + attribute.String("operation", "wait_for_tc_mutex"), + attribute.String("instance_id", req.instanceID), + attribute.String("tap", req.tapName), + ) + m.tcMu.Lock() + waitSpanEnd(nil) + defer m.tcMu.Unlock() + + applyCtx, applySpanEnd := startNetworkStep(waitCtx, "network.rate_limit.apply", + attribute.String("operation", "apply_rate_limit"), + attribute.String("instance_id", req.instanceID), + attribute.String("tap", req.tapName), + attribute.String("bridge", req.bridgeName), + attribute.Bool("download_rate_limit", req.downloadBps > 0), + attribute.Bool("upload_rate_limit", req.uploadBps > 0), + ) + var applyErr error + defer func() { + applySpanEnd(applyErr) + }() + + if req.downloadBps > 0 { + downloadCtx, downloadEnd := startNetworkStep(applyCtx, "network.rate_limit.download", + attribute.String("operation", "download_rate_limit"), + attribute.String("instance_id", req.instanceID), + attribute.String("tap", req.tapName), + ) + if err := m.applyDownloadRateLimit(downloadCtx, req.tapName, req.downloadBps); err != nil { + downloadEnd(err) + applyErr = err + log.ErrorContext(ctx, "failed to apply async download rate limit", + "instance_id", req.instanceID, + "tap", req.tapName, + "error", err) + } else { + downloadEnd(nil) + } + } + + if req.uploadBps > 0 { + uploadCtx, uploadEnd := startNetworkStep(applyCtx, "network.rate_limit.upload", + attribute.String("operation", "upload_rate_limit"), + attribute.String("instance_id", req.instanceID), + attribute.String("tap", req.tapName), + attribute.String("bridge", req.bridgeName), + ) + classID, err := m.addVMClass(uploadCtx, req.bridgeName, req.tapName, req.uploadBps, req.uploadCeilBps) + uploadEnd(err) + if err != nil { + applyErr = err + log.ErrorContext(ctx, "failed to apply async upload rate limit", + "instance_id", req.instanceID, + "tap", req.tapName, + "bridge", req.bridgeName, + "error", err) + return + } + + _, saveEnd := startNetworkStep(applyCtx, "network.save_class_id", + attribute.String("operation", "save_class_id"), + attribute.String("instance_id", req.instanceID), + attribute.Bool("has_class_id", classID != ""), + ) + if classID != "" { + if err := m.saveClassID(req.instanceID, classID); err != nil { + saveEnd(err) + applyErr = err + log.ErrorContext(ctx, "failed to persist async tc class id", + "instance_id", req.instanceID, + "tap", req.tapName, + "class_id", classID, + "error", err) + return + } + } else { + m.clearClassID(req.instanceID) + } + saveEnd(nil) + } else { + m.clearClassID(req.instanceID) + } + + log.DebugContext(ctx, "async network rate limits applied", + "instance_id", req.instanceID, + "tap", req.tapName, + "download_bps", req.downloadBps, + "upload_bps", req.uploadBps) +} + // ReleaseByInstanceID is a best-effort fallback for cases where the full Allocation // can't be derived (e.g. metadata read failure during stop, or rollback after a // CreateAllocation that succeeded but where downstream metadata writes failed). @@ -221,8 +353,13 @@ func (m *manager) ReleaseAllocation(ctx context.Context, alloc *Allocation) erro if alloc == nil { return nil } + m.forgetPendingAllocation(alloc.InstanceID) - // 1. Delete TAP device (best effort), using stored class ID for correct HTB cleanup + // 1. Delete TAP device (best effort), using stored class ID for correct HTB cleanup. + // Serialize with async tc setup so a queued rate-limit job cannot race with + // class removal and leave stale bridge state behind. + m.tcMu.Lock() + defer m.tcMu.Unlock() if err := m.deleteTAPDevice(alloc.TAPDevice, alloc.ClassID); err != nil { log.WarnContext(ctx, "failed to delete TAP device", "tap", alloc.TAPDevice, "error", err) } else { @@ -240,8 +377,13 @@ func (m *manager) ReleaseAllocation(ctx context.Context, alloc *Allocation) erro // getOrInitDefaultNetwork resolves the default network and self-heals by running // Initialize if bridge state is missing, then retries briefly to absorb netlink propagation delay. func (m *manager) getOrInitDefaultNetwork(ctx context.Context) (*Network, error) { + if network := m.cachedDefaultNetwork(); network != nil { + return network, nil + } + network, err := m.getDefaultNetwork(ctx) if err == nil { + m.setDefaultNetwork(network) return network, nil } @@ -256,6 +398,7 @@ func (m *manager) getOrInitDefaultNetwork(ctx context.Context) (*Network, error) for i := 0; i < retries; i++ { network, err = m.getDefaultNetwork(ctx) if err == nil { + m.setDefaultNetwork(network) return network, nil } time.Sleep(retryDelay) @@ -275,13 +418,6 @@ func (m *manager) allocateNextIP(ctx context.Context, subnet string) (string, er chooseSpanEnd(chooseErr) }() - // Parse subnet - _, ipNet, err := net.ParseCIDR(subnet) - if err != nil { - chooseErr = err - return "", fmt.Errorf("parse subnet: %w", err) - } - // Get all currently allocated IPs listCtx, listSpanEnd := startNetworkStep(chooseCtx, "network.list_allocations", attribute.String("operation", "list_allocations"), @@ -294,6 +430,20 @@ func (m *manager) allocateNextIP(ctx context.Context, subnet string) (string, er return "", fmt.Errorf("list allocations: %w", err) } + ip, err := allocateNextIPFromAllocations(subnet, allocations) + if err != nil { + chooseErr = err + } + return ip, err +} + +func allocateNextIPFromAllocations(subnet string, allocations []Allocation) (string, error) { + // Parse subnet + _, ipNet, err := net.ParseCIDR(subnet) + if err != nil { + return "", fmt.Errorf("parse subnet: %w", err) + } + // Build set of used IPs usedIPs := make(map[string]bool) for _, alloc := range allocations { @@ -343,8 +493,7 @@ func (m *manager) allocateNextIP(ctx context.Context, subnet string) (string, er } } - chooseErr = fmt.Errorf("no available IPs in subnet %s after %d random attempts and full scan", subnet, maxRetries) - return "", chooseErr + return "", fmt.Errorf("no available IPs in subnet %s after %d random attempts and full scan", subnet, maxRetries) } // incrementIP increments IP address by n @@ -396,6 +545,14 @@ func (m *manager) allocateUniqueMAC(ctx context.Context) (string, error) { return "", fmt.Errorf("list allocations: %w", err) } + mac, err := allocateUniqueMACFromAllocations(allocations, generateMAC) + if err != nil { + chooseErr = err + } + return mac, err +} + +func allocateUniqueMACFromAllocations(allocations []Allocation, generate func() (string, error)) (string, error) { usedMACs := make(map[string]bool) for _, alloc := range allocations { mac := strings.ToLower(strings.TrimSpace(alloc.MAC)) @@ -403,12 +560,7 @@ func (m *manager) allocateUniqueMAC(ctx context.Context) (string, error) { usedMACs[mac] = true } } - - mac, err := allocateUniqueMACFromSet(usedMACs, generateMAC) - if err != nil { - chooseErr = err - } - return mac, err + return allocateUniqueMACFromSet(usedMACs, generate) } func allocateUniqueMACFromSet(usedMACs map[string]bool, generate func() (string, error)) (string, error) { diff --git a/lib/network/bridge_darwin.go b/lib/network/bridge_darwin.go index 7721c412..b979f22c 100644 --- a/lib/network/bridge_darwin.go +++ b/lib/network/bridge_darwin.go @@ -36,8 +36,16 @@ func (m *manager) setupBridgeHTB(ctx context.Context, bridgeName string, capacit // createTAPDevice is a no-op on macOS as we use NAT networking. // Virtualization.framework creates virtual network interfaces internally. -func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName string, isolated bool, downloadBps, uploadBps, uploadCeilBps int64) (string, error) { +func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName string, isolated bool) error { // On macOS with vz, network devices are created by the VMM itself + return nil +} + +func (m *manager) applyDownloadRateLimit(ctx context.Context, tapName string, rateLimitBps int64) error { + return nil +} + +func (m *manager) addVMClass(ctx context.Context, bridgeName, tapName string, rateBps, ceilBps int64) (string, error) { return "", nil } diff --git a/lib/network/bridge_linux.go b/lib/network/bridge_linux.go index ddd7a15b..6fab2679 100644 --- a/lib/network/bridge_linux.go +++ b/lib/network/bridge_linux.go @@ -495,11 +495,8 @@ func (m *manager) lastHypemanForwardRulePosition() int { return lastPos } -// createTAPDevice creates TAP device and attaches to bridge. -// downloadBps: rate limit for download (external→VM), applied as TBF on TAP egress -// uploadBps/uploadCeilBps: rate limit for upload (VM→external), applied as HTB class on bridge -// Returns the tc class ID actually assigned (empty if no upload rate limiting). -func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName string, isolated bool, downloadBps, uploadBps, uploadCeilBps int64) (string, error) { +// createTAPDevice creates TAP device and attaches it to the bridge. +func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName string, isolated bool) error { // 1. Check if TAP already exists _, linkLookupEnd := startNetworkStep(ctx, "network.create_tap.link_lookup_existing", attribute.String("operation", "link_lookup_existing"), @@ -516,7 +513,7 @@ func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName strin err := m.deleteTAPDevice(tapName, "") deleteEnd(err) if err != nil { - return "", fmt.Errorf("delete existing TAP: %w", err) + return fmt.Errorf("delete existing TAP: %w", err) } } @@ -541,7 +538,7 @@ func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName strin err = netlink.LinkAdd(tap) linkAddEnd(err) if err != nil { - return "", fmt.Errorf("create TAP device: %w", err) + return fmt.Errorf("create TAP device: %w", err) } // 3. Set TAP up @@ -552,7 +549,7 @@ func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName strin tapLink, err := netlink.LinkByName(tapName) linkByNameEnd(err) if err != nil { - return "", fmt.Errorf("get TAP link: %w", err) + return fmt.Errorf("get TAP link: %w", err) } _, setUpEnd := startNetworkStep(ctx, "network.create_tap.link_set_up", @@ -562,7 +559,7 @@ func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName strin err = netlink.LinkSetUp(tapLink) setUpEnd(err) if err != nil { - return "", fmt.Errorf("set TAP up: %w", err) + return fmt.Errorf("set TAP up: %w", err) } // 4. Attach TAP to bridge @@ -573,7 +570,7 @@ func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName strin bridge, err := netlink.LinkByName(bridgeName) bridgeLookupEnd(err) if err != nil { - return "", fmt.Errorf("get bridge: %w", err) + return fmt.Errorf("get bridge: %w", err) } _, setMasterEnd := startNetworkStep(ctx, "network.create_tap.link_set_master", @@ -584,7 +581,7 @@ func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName strin err = netlink.LinkSetMaster(tapLink, bridge) setMasterEnd(err) if err != nil { - return "", fmt.Errorf("attach TAP to bridge: %w", err) + return fmt.Errorf("attach TAP to bridge: %w", err) } // 5. Enable port isolation so isolated TAPs can't directly talk to each other (requires kernel support and capabilities) @@ -603,40 +600,11 @@ func (m *manager) createTAPDevice(ctx context.Context, tapName, bridgeName strin output, err := cmd.CombinedOutput() isolationEnd(err) if err != nil { - return "", fmt.Errorf("set isolation mode: %w (output: %s)", err, string(output)) + return fmt.Errorf("set isolation mode: %w (output: %s)", err, string(output)) } } - // 6. Apply download rate limiting (TBF on TAP egress) - if downloadBps > 0 { - _, downloadEnd := startNetworkStep(ctx, "network.create_tap.download_rate_limit", - attribute.String("operation", "download_rate_limit"), - attribute.String("tap", tapName), - ) - err := m.applyDownloadRateLimit(ctx, tapName, downloadBps) - downloadEnd(err) - if err != nil { - return "", fmt.Errorf("apply download rate limit: %w", err) - } - } - - // 7. Apply upload rate limiting (HTB class on bridge) - var classID string - if uploadBps > 0 { - var err error - uploadCtx, uploadEnd := startNetworkStep(ctx, "network.create_tap.upload_rate_limit", - attribute.String("operation", "upload_rate_limit"), - attribute.String("tap", tapName), - attribute.String("bridge", bridgeName), - ) - classID, err = m.addVMClass(uploadCtx, bridgeName, tapName, uploadBps, uploadCeilBps) - uploadEnd(err) - if err != nil { - return "", fmt.Errorf("apply upload rate limit: %w", err) - } - } - - return classID, nil + return nil } // applyDownloadRateLimit applies download (external→VM) rate limiting using TBF on TAP egress. @@ -660,7 +628,7 @@ func (m *manager) applyDownloadRateLimit(ctx context.Context, tapName string, ra cmd.SysProcAttr = &syscall.SysProcAttr{ AmbientCaps: []uintptr{unix.CAP_NET_ADMIN}, } - _, tcEnd := startNetworkStep(ctx, "network.create_tap.download_rate_limit.tc_qdisc_tbf", + _, tcEnd := startNetworkStep(ctx, "network.rate_limit.download.tc_qdisc_tbf", attribute.String("operation", "tc_qdisc_tbf"), attribute.String("tap", tapName), ) @@ -760,7 +728,7 @@ func (m *manager) addVMClass(ctx context.Context, bridgeName, tapName string, ra cmd.SysProcAttr = &syscall.SysProcAttr{ AmbientCaps: []uintptr{unix.CAP_NET_ADMIN}, } - _, classAddEnd := startNetworkStep(ctx, "network.create_tap.upload_rate_limit.tc_class_add", + _, classAddEnd := startNetworkStep(ctx, "network.rate_limit.upload.tc_class_add", attribute.String("operation", "tc_class_add"), attribute.String("tap", tapName), attribute.String("bridge", bridgeName), @@ -796,7 +764,7 @@ func (m *manager) addVMClass(ctx context.Context, bridgeName, tapName string, ra qdiscCmd.SysProcAttr = &syscall.SysProcAttr{ AmbientCaps: []uintptr{unix.CAP_NET_ADMIN}, } - _, fqCodelEnd := startNetworkStep(ctx, "network.create_tap.upload_rate_limit.tc_qdisc_fq_codel", + _, fqCodelEnd := startNetworkStep(ctx, "network.rate_limit.upload.tc_qdisc_fq_codel", attribute.String("operation", "tc_qdisc_fq_codel"), attribute.String("tap", tapName), attribute.String("bridge", bridgeName), @@ -805,7 +773,7 @@ func (m *manager) addVMClass(ctx context.Context, bridgeName, tapName string, ra fqCodelErr := qdiscCmd.Run() // Best effort fqCodelEnd(fqCodelErr) - _, filterLinkEnd := startNetworkStep(ctx, "network.create_tap.upload_rate_limit.link_lookup_filter", + _, filterLinkEnd := startNetworkStep(ctx, "network.rate_limit.upload.link_lookup_filter", attribute.String("operation", "link_lookup_filter"), attribute.String("tap", tapName), ) @@ -823,7 +791,7 @@ func (m *manager) addVMClass(ctx context.Context, bridgeName, tapName string, ra filterCmd.SysProcAttr = &syscall.SysProcAttr{ AmbientCaps: []uintptr{unix.CAP_NET_ADMIN}, } - _, filterEnd := startNetworkStep(ctx, "network.create_tap.upload_rate_limit.tc_filter_add", + _, filterEnd := startNetworkStep(ctx, "network.rate_limit.upload.tc_filter_add", attribute.String("operation", "tc_filter_add"), attribute.String("tap", tapName), attribute.String("bridge", bridgeName), @@ -914,23 +882,6 @@ func deriveClassID(tapName string) string { return fmt.Sprintf("%04x", deriveClassIDVal(tapName)) } -// formatTcRate formats bytes per second as a tc rate string. -// It uses the largest unit that exactly represents the value to avoid -// truncation from integer division (e.g., 2.5 Gbps becomes "2500mbit" not "2gbit"). -func formatTcRate(bytesPerSec int64) string { - bitsPerSec := bytesPerSec * 8 - switch { - case bitsPerSec >= 1000000000 && bitsPerSec%1000000000 == 0: - return fmt.Sprintf("%dgbit", bitsPerSec/1000000000) - case bitsPerSec >= 1000000 && bitsPerSec%1000000 == 0: - return fmt.Sprintf("%dmbit", bitsPerSec/1000000) - case bitsPerSec >= 1000 && bitsPerSec%1000 == 0: - return fmt.Sprintf("%dkbit", bitsPerSec/1000) - default: - return fmt.Sprintf("%dbit", bitsPerSec) - } -} - // deleteTAPDevice removes TAP device and its associated HTB class on the bridge. // If classID is non-empty, it is used for removeVMClass; otherwise falls back to deriveClassID. func (m *manager) deleteTAPDevice(tapName, classID string) error { diff --git a/lib/network/derive.go b/lib/network/derive.go index 6fd51336..d1d2fc88 100644 --- a/lib/network/derive.go +++ b/lib/network/derive.go @@ -3,9 +3,11 @@ package network import ( "context" "encoding/json" + "errors" "fmt" "net" "os" + "time" "github.com/kernel/hypeman/lib/hypervisor" "github.com/kernel/hypeman/lib/logger" @@ -94,7 +96,22 @@ func (m *manager) deriveAllocation(ctx context.Context, instanceID string) (*All // GetAllocation gets the allocation for a specific instance func (m *manager) GetAllocation(ctx context.Context, instanceID string) (*Allocation, error) { - return m.deriveAllocation(ctx, instanceID) + alloc, err := m.deriveAllocation(ctx, instanceID) + if err == nil && alloc != nil { + return alloc, nil + } + if err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, err + } + + m.mu.Lock() + defer m.mu.Unlock() + m.prunePendingAllocationsLocked(time.Now()) + if pending, ok := m.pendingAllocations[instanceID]; ok { + alloc := pending.allocation + return &alloc, nil + } + return alloc, err } // ListAllocations scans all guest directories and derives allocations @@ -124,6 +141,9 @@ func (m *manager) ListAllocations(ctx context.Context) ([]Allocation, error) { // excludeInstanceID allows excluding a specific instance from the check (used when // starting an existing instance to avoid it conflicting with itself). func (m *manager) NameExists(ctx context.Context, name string, excludeInstanceID string) (bool, error) { + m.mu.Lock() + defer m.mu.Unlock() + checkCtx, checkSpanEnd := startNetworkStep(ctx, "network.name_exists", attribute.String("operation", "name_exists"), ) @@ -136,23 +156,70 @@ func (m *manager) NameExists(ctx context.Context, name string, excludeInstanceID attribute.String("operation", "list_allocations"), attribute.String("caller", "name_exists"), ) - allocations, err := m.ListAllocations(listCtx) + allocations, err := m.listAllocationsWithPendingLocked(listCtx) listSpanEnd(err) if err != nil { checkErr = err return false, err } + return nameExistsInAllocations(allocations, name, excludeInstanceID), nil +} + +func (m *manager) listAllocationsWithPendingLocked(ctx context.Context) ([]Allocation, error) { + allocations, err := m.ListAllocations(ctx) + if err != nil { + return nil, err + } + + m.prunePendingAllocationsLocked(time.Now()) + seen := make(map[string]struct{}, len(allocations)) + for _, alloc := range allocations { + seen[alloc.InstanceID] = struct{}{} + } + for id, pending := range m.pendingAllocations { + if _, ok := seen[id]; ok { + continue + } + allocations = append(allocations, pending.allocation) + } + return allocations, nil +} + +func (m *manager) rememberPendingAllocationLocked(alloc Allocation) { + if m.pendingAllocations == nil { + m.pendingAllocations = make(map[string]pendingAllocation) + } + m.pendingAllocations[alloc.InstanceID] = pendingAllocation{ + allocation: alloc, + expiresAt: time.Now().Add(pendingAllocationTTL), + } +} + +func (m *manager) forgetPendingAllocation(instanceID string) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.pendingAllocations, instanceID) +} + +func (m *manager) prunePendingAllocationsLocked(now time.Time) { + for id, pending := range m.pendingAllocations { + if now.After(pending.expiresAt) { + delete(m.pendingAllocations, id) + } + } +} + +func nameExistsInAllocations(allocations []Allocation, name, excludeInstanceID string) bool { for _, alloc := range allocations { - // Skip the excluded instance (e.g., when restarting an instance) if excludeInstanceID != "" && alloc.InstanceID == excludeInstanceID { continue } if alloc.InstanceName == name { - return true, nil + return true } } - return false, nil + return false } // loadInstanceMetadata loads minimal instance metadata diff --git a/lib/network/manager.go b/lib/network/manager.go index 333f3478..5e7ec449 100644 --- a/lib/network/manager.go +++ b/lib/network/manager.go @@ -49,18 +49,30 @@ type Manager interface { // manager implements the Manager interface type manager struct { - paths *paths.Paths - config *config.Config - mu sync.Mutex // Protects network allocation operations (IP allocation) - metrics *Metrics + paths *paths.Paths + config *config.Config + mu sync.Mutex // Protects network identity reservation. + networkMu sync.RWMutex + defaultNetwork *Network + pendingAllocations map[string]pendingAllocation + tcMu sync.Mutex // Serializes shared bridge tc mutations. + metrics *Metrics } +type pendingAllocation struct { + allocation Allocation + expiresAt time.Time +} + +const pendingAllocationTTL = 10 * time.Minute + // NewManager creates a new network manager. // If meter is nil, metrics are disabled. func NewManager(p *paths.Paths, cfg *config.Config, meter metric.Meter) Manager { m := &manager{ - paths: p, - config: cfg, + paths: p, + config: cfg, + pendingAllocations: make(map[string]pendingAllocation), } // Initialize metrics if meter is provided @@ -104,6 +116,15 @@ func (m *manager) Initialize(ctx context.Context, runningInstanceIDs []string) e if err := m.createBridge(ctx, m.config.Network.BridgeName, gateway, m.config.Network.SubnetCIDR); err != nil { return fmt.Errorf("setup default network: %w", err) } + m.setDefaultNetwork(&Network{ + Name: "default", + Subnet: m.config.Network.SubnetCIDR, + Gateway: gateway, + Bridge: m.config.Network.BridgeName, + // Per-TAP port isolation is the default network policy used by createTAPDevice. + Isolated: true, + Default: true, + }) // Cleanup orphaned TAP devices from previous runs (crashes, power loss, etc.). // Startup runs before any concurrent CreateAllocation can be in flight, so no @@ -121,6 +142,26 @@ func (m *manager) Initialize(ctx context.Context, runningInstanceIDs []string) e return nil } +func cloneNetwork(network *Network) *Network { + if network == nil { + return nil + } + cloned := *network + return &cloned +} + +func (m *manager) cachedDefaultNetwork() *Network { + m.networkMu.RLock() + defer m.networkMu.RUnlock() + return cloneNetwork(m.defaultNetwork) +} + +func (m *manager) setDefaultNetwork(network *Network) { + m.networkMu.Lock() + defer m.networkMu.Unlock() + m.defaultNetwork = cloneNetwork(network) +} + // getDefaultNetwork gets the default network details from kernel state func (m *manager) getDefaultNetwork(ctx context.Context) (*Network, error) { // Query from kernel diff --git a/lib/network/manager_test.go b/lib/network/manager_test.go index 8d21226d..8a391236 100644 --- a/lib/network/manager_test.go +++ b/lib/network/manager_test.go @@ -1,9 +1,12 @@ package network import ( + "context" "net" "testing" + "github.com/kernel/hypeman/cmd/api/config" + "github.com/kernel/hypeman/lib/paths" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -66,6 +69,51 @@ func TestAllocateUniqueMACFromSetFallsBackToSequentialScan(t *testing.T) { assert.Equal(t, macAllocationRandomAttempts, calls) } +func TestPendingAllocationVisibleToNameExistsAndGetAllocation(t *testing.T) { + m := &manager{ + paths: paths.New(t.TempDir()), + config: &config.Config{}, + pendingAllocations: make(map[string]pendingAllocation), + } + + m.mu.Lock() + m.rememberPendingAllocationLocked(Allocation{ + InstanceID: "inst-pending", + InstanceName: "pending-name", + IP: "10.100.0.42", + MAC: "02:00:00:00:00:42", + TAPDevice: "hype-pending", + State: "pending", + }) + m.mu.Unlock() + + exists, err := m.NameExists(context.Background(), "pending-name", "") + require.NoError(t, err) + assert.True(t, exists) + + alloc, err := m.GetAllocation(context.Background(), "inst-pending") + require.NoError(t, err) + require.NotNil(t, alloc) + assert.Equal(t, "10.100.0.42", alloc.IP) + assert.Equal(t, "pending", alloc.State) +} + +func TestDefaultNetworkCacheReturnsCopy(t *testing.T) { + m := &manager{} + m.setDefaultNetwork(&Network{ + Name: "default", + Bridge: "hm0", + Subnet: "10.244.0.0/16", + Gateway: "10.244.0.1", + }) + + cached := m.cachedDefaultNetwork() + require.NotNil(t, cached) + cached.Bridge = "mutated" + + assert.Equal(t, "hm0", m.cachedDefaultNetwork().Bridge) +} + func TestGenerateTAPName(t *testing.T) { tests := []struct { name string diff --git a/lib/network/tc_rate.go b/lib/network/tc_rate.go new file mode 100644 index 00000000..86b52e76 --- /dev/null +++ b/lib/network/tc_rate.go @@ -0,0 +1,20 @@ +package network + +import "fmt" + +// formatTcRate formats bytes per second as a tc rate string. +// It uses the largest unit that exactly represents the value to avoid +// truncation from integer division (e.g., 2.5 Gbps becomes "2500mbit" not "2gbit"). +func formatTcRate(bytesPerSec int64) string { + bitsPerSec := bytesPerSec * 8 + switch { + case bitsPerSec >= 1000000000 && bitsPerSec%1000000000 == 0: + return fmt.Sprintf("%dgbit", bitsPerSec/1000000000) + case bitsPerSec >= 1000000 && bitsPerSec%1000000 == 0: + return fmt.Sprintf("%dmbit", bitsPerSec/1000000) + case bitsPerSec >= 1000 && bitsPerSec%1000 == 0: + return fmt.Sprintf("%dkbit", bitsPerSec/1000) + default: + return fmt.Sprintf("%dbit", bitsPerSec) + } +}