Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ All notable changes to this project will be documented in this file.

- Smartcontract
- Migrate read callers in the CLI, sentinel, client, controlplane admin, and Rust SDK topology helper to read interfaces from `Device::new_interfaces` instead of the legacy `interfaces` enum vec, and adopt the `Device::find_interface` signature that returns `&NewInterface`. The legacy `interfaces` slot is still written on-disk via the per-write V2 projection from #3667; this PR only migrates reads. The temporary `Device::find_interface_legacy` helper is retained for the smartcontract program processors, which migrate in a later issue. Activator is intentionally excluded — it is deprecated ([#3659](https://github.com/malbeclabs/doublezero/issues/3659))
- Client
- Add periodic kernel route reconciliation to `doublezerod` that detects and reinstalls missing routes, with a metric tracking install failures ([#3669](https://github.com/malbeclabs/doublezero/issues/3669))
- Activator
- Delete the `activator/` crate from the workspace; onchain allocation (RFC-11) supersedes it. The deployed activator was frozen in Phase 1 ([#3608](https://github.com/malbeclabs/doublezero/pull/3608), [#3628](https://github.com/malbeclabs/doublezero/pull/3628)) and removed from e2e in Phase 2 ([#3609](https://github.com/malbeclabs/doublezero/pull/3609), [#3610](https://github.com/malbeclabs/doublezero/pull/3610), [#3611](https://github.com/malbeclabs/doublezero/pull/3611), [#3629](https://github.com/malbeclabs/doublezero/pull/3629)). The `*/activate`, `*/reject`, and `*/closeaccount` onchain instructions and their SDK command modules remain in place for older CLIs until the min-version gate ([#3612](https://github.com/malbeclabs/doublezero/issues/3612))
- Smartcontract
Expand Down
30 changes: 17 additions & 13 deletions client/doublezerod/cmd/doublezerod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ var (
stateDir = flag.String("state-dir", "/var/lib/doublezerod", "directory for persistent state files")

// Route liveness configuration flags.
routeLivenessTxMin = flag.Duration("route-liveness-tx-min", defaultRouteLivenessTxMin, "route liveness tx min")
routeLivenessRxMin = flag.Duration("route-liveness-rx-min", defaultRouteLivenessRxMin, "route liveness rx min")
routeLivenessDetectMult = flag.Uint("route-liveness-detect-mult", defaultRouteLivenessDetectMult, "route liveness detect mult")
routeLivenessMinTxFloor = flag.Duration("route-liveness-min-tx-floor", defaultRouteLivenessMinTxFloor, "route liveness min tx floor")
routeLivenessMaxTxCeil = flag.Duration("route-liveness-max-tx-ceil", defaultRouteLivenessMaxTxCeil, "route liveness max tx ceil")
routeLivenessPeerMetrics = flag.Bool("route-liveness-peer-metrics", false, "enables per peer metrics for route liveness (high cardinality)")
routeLivenessDebug = flag.Bool("route-liveness-debug", false, "enables debug logging for route liveness")
routeLivenessTxMin = flag.Duration("route-liveness-tx-min", defaultRouteLivenessTxMin, "route liveness tx min")
routeLivenessRxMin = flag.Duration("route-liveness-rx-min", defaultRouteLivenessRxMin, "route liveness rx min")
routeLivenessDetectMult = flag.Uint("route-liveness-detect-mult", defaultRouteLivenessDetectMult, "route liveness detect mult")
routeLivenessMinTxFloor = flag.Duration("route-liveness-min-tx-floor", defaultRouteLivenessMinTxFloor, "route liveness min tx floor")
routeLivenessMaxTxCeil = flag.Duration("route-liveness-max-tx-ceil", defaultRouteLivenessMaxTxCeil, "route liveness max tx ceil")
routeLivenessReconcileInterval = flag.Duration("route-liveness-reconcile-interval", defaultRouteLivenessReconcileInterval, "interval for periodic kernel route reconciliation; 0 disables")
routeLivenessPeerMetrics = flag.Bool("route-liveness-peer-metrics", false, "enables per peer metrics for route liveness (high cardinality)")
routeLivenessDebug = flag.Bool("route-liveness-debug", false, "enables debug logging for route liveness")

// TODO(snormore): These flags are temporary for initial rollout testing.
// They will be superceded by a single `route-liveness-enable` flag, where false means
Expand All @@ -66,12 +67,13 @@ var (
)

const (
defaultOnchainRPCTimeout = 30 * time.Second
defaultRouteLivenessTxMin = 1 * time.Second
defaultRouteLivenessRxMin = 1 * time.Second
defaultRouteLivenessDetectMult = 3
defaultRouteLivenessMinTxFloor = 50 * time.Millisecond
defaultRouteLivenessMaxTxCeil = 3 * time.Second
defaultOnchainRPCTimeout = 30 * time.Second
defaultRouteLivenessTxMin = 1 * time.Second
defaultRouteLivenessRxMin = 1 * time.Second
defaultRouteLivenessDetectMult = 3
defaultRouteLivenessMinTxFloor = 50 * time.Millisecond
defaultRouteLivenessMaxTxCeil = 3 * time.Second
defaultRouteLivenessReconcileInterval = 30 * time.Second

defaultRouteLivenessBindIP = "0.0.0.0"
)
Expand Down Expand Up @@ -179,6 +181,8 @@ func main() {

EnablePeerMetrics: *routeLivenessPeerMetrics,

RouteReconcileInterval: *routeLivenessReconcileInterval,

// Default to treating peers that advertise passive mode as passive. That is, we will
// install their routes immediately and never uninstall them on down events.
HonorPeerAdvertisedPassive: true,
Expand Down
118 changes: 116 additions & 2 deletions client/doublezerod/internal/liveness/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/malbeclabs/doublezero/client/doublezerod/internal/routing"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sys/unix"
)

const (
Expand All @@ -25,6 +26,9 @@ const (
defaultBackoffMax = 1 * time.Minute

defaultMaxEvents = 10240

// Default interval for periodic kernel route reconciliation.
defaultRouteReconcileInterval = 30 * time.Second
)

// Peer identifies a remote endpoint and the local interface context used to reach it.
Expand Down Expand Up @@ -92,6 +96,10 @@ type ManagerConfig struct {

// Client version to advertise to peers in control packets.
ClientVersion string

// RouteReconcileInterval controls how often the manager scans the kernel
// routing table for missing routes and reinstalls them. Zero disables.
RouteReconcileInterval time.Duration
}

// Validate fills defaults and enforces constraints for ManagerConfig.
Expand Down Expand Up @@ -151,6 +159,12 @@ func (c *ManagerConfig) Validate() error {
if c.ClientVersion == "" {
return errors.New("clientVersion is required")
}
if c.RouteReconcileInterval < 0 {
return errors.New("routeReconcileInterval must be non-negative")
}
if c.RouteReconcileInterval == 0 {
c.RouteReconcileInterval = defaultRouteReconcileInterval
}
return nil
}

Expand Down Expand Up @@ -291,6 +305,26 @@ func NewManager(ctx context.Context, cfg *ManagerConfig, cr *routing.ConfiguredR
}
}()

// Route reconciliation goroutine: periodically scans the kernel routing
// table for missing routes and reinstalls them.
if cfg.RouteReconcileInterval > 0 {
log.Info("liveness: route reconciliation enabled", "interval", cfg.RouteReconcileInterval.String())
m.wg.Add(1)
go func() {
defer m.wg.Done()
ticker := time.NewTicker(cfg.RouteReconcileInterval)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
m.reconcileRoutes()
}
}
}()
}

// If any routes are configured to be excluded, mark then as AdminDown immediately.
if m.cr != nil {
for ip := range m.cr.GetExcluded() {
Expand Down Expand Up @@ -834,7 +868,7 @@ func (m *manager) onSessionDown(sess *Session) {
}

if m.cfg.PassiveMode {
m.log.Debug("liveness: session down (global passive; keeping route)",
m.log.Info("liveness: session down (global passive; keeping route)",
"peer", peer.String(),
"route", snap.Route.String(),
"downSince", snap.DownSince.UTC().String(),
Expand All @@ -845,7 +879,7 @@ func (m *manager) onSessionDown(sess *Session) {
}

if effectivelyPassive {
m.log.Debug("liveness: session down (peer passive; keeping route)",
m.log.Info("liveness: session down (peer passive; keeping route)",
"peer", peer.String(),
"route", snap.Route.String(),
"downSince", snap.DownSince.UTC().String(),
Expand Down Expand Up @@ -875,6 +909,86 @@ func (m *manager) onSessionDown(sess *Session) {
)
}

// reconcileRoutes scans the kernel routing table for routes that should be
// installed but are missing, and reinstalls them. This mitigates routes being
// removed by external processes.
func (m *manager) reconcileRoutes() {
// Snapshot installed and desired under lock.
type installedRoute struct {
rk RouteKey
route *Route
}
m.mu.Lock()
toCheck := make([]installedRoute, 0, len(m.installed))
for rk, ok := range m.installed {
if !ok {
continue
}
if r, exists := m.desired[rk]; exists {
toCheck = append(toCheck, installedRoute{rk: rk, route: r})
}
}
m.mu.Unlock()

if len(toCheck) == 0 {
return
}

kernelRoutes, err := m.cfg.Netlinker.RouteByProtocol(unix.RTPROT_BGP)
if err != nil {
m.log.Error("liveness: error fetching kernel routes for reconciliation", "error", err)
return
}

// Build a lookup set keyed by (table, dst, nexthop, src) for fast matching.
type kernelKey struct {
Table int
DstIP string
NextHop string
SrcIP string
}
kernelSet := make(map[kernelKey]struct{}, len(kernelRoutes))
for _, kr := range kernelRoutes {
var dstIP, nhIP, srcIP string
if kr.Dst != nil && kr.Dst.IP != nil && kr.Dst.IP.To4() != nil {
dstIP = kr.Dst.IP.To4().String()
}
if kr.NextHop != nil && kr.NextHop.To4() != nil {
nhIP = kr.NextHop.To4().String()
}
if kr.Src != nil && kr.Src.To4() != nil {
srcIP = kr.Src.To4().String()
}
kernelSet[kernelKey{Table: kr.Table, DstIP: dstIP, NextHop: nhIP, SrcIP: srcIP}] = struct{}{}
}

for _, ir := range toCheck {
kk := kernelKey{Table: ir.route.Table, DstIP: ir.rk.DstPrefix, NextHop: ir.rk.NextHop, SrcIP: ir.rk.SrcIP}
if _, present := kernelSet[kk]; present {
continue
}
// Re-check under lock: the route may have been intentionally withdrawn
// between our snapshot and now (e.g. by onSessionDown).
m.mu.Lock()
stillInstalled := m.installed[ir.rk]
m.mu.Unlock()
if !stillInstalled {
continue
}
m.log.Warn("liveness: reinstalling missing route",
"route", ir.route.String(),
"iface", ir.rk.Interface,
)
if err := m.cfg.Netlinker.RouteAdd(&ir.route.Route); err != nil {
m.log.Error("liveness: error reinstalling route",
"error", err, "route", ir.route.String())
m.metrics.RouteInstallFailures.WithLabelValues(ir.rk.Interface, ir.rk.SrcIP).Inc()
} else {
m.metrics.routeReinstall(ir.rk.Interface, ir.rk.SrcIP)
}
}
}

// isPeerEffectivelyPassive returns true when this session should not have its
// dataplane (kernel route) managed due to peer-advertised passive mode.
//
Expand Down
124 changes: 124 additions & 0 deletions client/doublezerod/internal/liveness/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1650,6 +1650,130 @@ func metricHasLabels(m *prom.Metric, labels prometheus.Labels) bool {
return true
}

func TestClient_Liveness_Manager_ReconcileRoutes_ReinstallsMissing(t *testing.T) {
t.Parallel()

addCalls := 0
mock := &MockRouteReaderWriter{
RouteAddFunc: func(r *routing.Route) error {
addCalls++
return nil
},
RouteByProtocolFunc: func(int) ([]*routing.Route, error) {
// Return empty — no routes in kernel.
return nil, nil
},
}

m, reg, err := newTestManagerWithMetrics(t, func(cfg *ManagerConfig) {
cfg.Netlinker = mock
cfg.PassiveMode = true
cfg.RouteReconcileInterval = time.Hour // disable ticker; we call manually
})
require.NoError(t, err)
t.Cleanup(func() { _ = m.Close() })

r := newTestRoute(nil)
err = m.RegisterRoute(r, "lo", m.LocalAddr().Port)
require.NoError(t, err)

// RegisterRoute in PassiveMode calls RouteAdd once.
mock.mu.Lock()
addCalls = 0
mock.mu.Unlock()

m.reconcileRoutes()

mock.mu.Lock()
require.Equal(t, 1, addCalls, "expected one RouteAdd call to reinstall the missing route")
mock.mu.Unlock()

reinstalls := getCounterValue(t, reg, "doublezero_liveness_route_reinstalls_total",
prometheus.Labels{LabelIface: "lo", LabelLocalIP: r.Src.To4().String()})
require.Equal(t, float64(1), reinstalls)
}

func TestClient_Liveness_Manager_ReconcileRoutes_SkipsPresent(t *testing.T) {
t.Parallel()

r := newTestRoute(nil)
addCalls := 0
mock := &MockRouteReaderWriter{
RouteAddFunc: func(rr *routing.Route) error {
addCalls++
return nil
},
RouteByProtocolFunc: func(int) ([]*routing.Route, error) {
// Return the route as present in kernel.
return []*routing.Route{&r.Route}, nil
},
}

m, _, err := newTestManagerWithMetrics(t, func(cfg *ManagerConfig) {
cfg.Netlinker = mock
cfg.PassiveMode = true
cfg.RouteReconcileInterval = time.Hour
})
require.NoError(t, err)
t.Cleanup(func() { _ = m.Close() })

err = m.RegisterRoute(r, "lo", m.LocalAddr().Port)
require.NoError(t, err)

// Reset after RegisterRoute's install.
mock.mu.Lock()
addCalls = 0
mock.mu.Unlock()

m.reconcileRoutes()

mock.mu.Lock()
require.Equal(t, 0, addCalls, "should not reinstall a route that is present in the kernel")
mock.mu.Unlock()
}

func TestClient_Liveness_Manager_ReconcileRoutes_SkipsUninstalled(t *testing.T) {
t.Parallel()

addCalls := 0
mock := &MockRouteReaderWriter{
RouteAddFunc: func(r *routing.Route) error {
addCalls++
return nil
},
RouteByProtocolFunc: func(int) ([]*routing.Route, error) {
return nil, nil
},
}

// Active mode: route is registered but not installed until session goes Up.
m, _, err := newTestManagerWithMetrics(t, func(cfg *ManagerConfig) {
cfg.Netlinker = mock
cfg.PassiveMode = false
cfg.RouteReconcileInterval = time.Hour
})
require.NoError(t, err)
t.Cleanup(func() { _ = m.Close() })

r := newTestRoute(func(r *Route) {
r.Src = net.IPv4(127, 0, 0, 1)
r.Dst = &net.IPNet{IP: net.IPv4(127, 0, 0, 2), Mask: net.CIDRMask(32, 32)}
})
err = m.RegisterRoute(r, "lo", m.LocalAddr().Port)
require.NoError(t, err)

// In active mode, installed[rk] is false until session goes Up.
mock.mu.Lock()
addCalls = 0
mock.mu.Unlock()

m.reconcileRoutes()

mock.mu.Lock()
require.Equal(t, 0, addCalls, "should not reinstall a route that was never installed")
mock.mu.Unlock()
}

func getHistogramCount(t *testing.T, reg *prometheus.Registry, name string, labels prometheus.Labels) float64 {
t.Helper()

Expand Down
Loading
Loading