From 866155a99b5f3702c7522e536c49ef909e4a633d Mon Sep 17 00:00:00 2001 From: Ben Blier Date: Wed, 20 May 2026 14:48:36 -0400 Subject: [PATCH 1/5] tools/twamp/signed: add opt-in challenged ProbePair mode alongside the existing fast path --- .../cmd/geoprobe-target-sender/main.go | 2 +- tools/twamp/pkg/signed/packet.go | 24 ++++++ tools/twamp/pkg/signed/sender.go | 6 +- tools/twamp/pkg/signed/sender_linux.go | 63 +++++++++++++- tools/twamp/pkg/signed/sender_test.go | 86 +++++++++++++++++-- tools/twamp/pkg/signed/stub_fallback.go | 2 +- 6 files changed, 171 insertions(+), 12 deletions(-) diff --git a/controlplane/telemetry/cmd/geoprobe-target-sender/main.go b/controlplane/telemetry/cmd/geoprobe-target-sender/main.go index 84f898ee29..f98b197283 100644 --- a/controlplane/telemetry/cmd/geoprobe-target-sender/main.go +++ b/controlplane/telemetry/cmd/geoprobe-target-sender/main.go @@ -115,7 +115,7 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() - sender, err := signed.NewSender(ctx, "", localAddr, remoteAddr, signer, remotePubkey) + sender, err := signed.NewSender(ctx, "", localAddr, remoteAddr, signer, remotePubkey, false) if err != nil { log.Error("failed to create sender", "error", err) os.Exit(1) diff --git a/tools/twamp/pkg/signed/packet.go b/tools/twamp/pkg/signed/packet.go index b9a7d96223..21322b2378 100644 --- a/tools/twamp/pkg/signed/packet.go +++ b/tools/twamp/pkg/signed/packet.go @@ -131,6 +131,30 @@ func NewProbePacket(seq uint32, signer Signer) *ProbePacket { return p } +// newChallengedProbePacket constructs a probe whose Sec/Frac carry the +// challenge nonce (BigEndian: Sec = upper 4 bytes, Frac = lower 4 bytes) +// rather than an NTP timestamp. The signature is computed over the +// nonce-bearing payload. Used by senders running in challenged mode. +func newChallengedProbePacket(seq uint32, signer Signer, nonce uint64) *ProbePacket { + pub := signer.Public() + p := &ProbePacket{ + Seq: seq, + Sec: uint32(nonce >> 32), + Frac: uint32(nonce & 0xFFFFFFFF), + } + copy(p.SenderPubkey[:], pub) + + var payload [probePayloadSize]byte + binary.BigEndian.PutUint32(payload[0:4], p.Seq) + binary.BigEndian.PutUint32(payload[4:8], p.Sec) + binary.BigEndian.PutUint32(payload[8:12], p.Frac) + copy(payload[12:44], p.SenderPubkey[:]) + + sig := signer.Sign(payload[:]) + copy(p.Signature[:], sig) + return p +} + func (p *ProbePacket) Marshal(buf []byte) error { if len(buf) < ProbePacketSize { return fmt.Errorf("buffer too small: %d < %d", len(buf), ProbePacketSize) diff --git a/tools/twamp/pkg/signed/sender.go b/tools/twamp/pkg/signed/sender.go index a1ab491a93..552edeb8bf 100644 --- a/tools/twamp/pkg/signed/sender.go +++ b/tools/twamp/pkg/signed/sender.go @@ -24,6 +24,8 @@ type Sender interface { } // NewSender creates a signed TWAMP sender. Only localAddr.Port is used; any IP is ignored. -func NewSender(ctx context.Context, iface string, localAddr, remoteAddr *net.UDPAddr, signer Signer, remotePubkey [32]byte) (Sender, error) { - return NewLinuxSender(ctx, iface, localAddr, remoteAddr, signer, remotePubkey) +// When challenged is true, ProbePair uses the challenge-response flow: Probe 1 is signed +// with the nonce echoed from Reply 0 rather than an NTP timestamp. +func NewSender(ctx context.Context, iface string, localAddr, remoteAddr *net.UDPAddr, signer Signer, remotePubkey [32]byte, challenged bool) (Sender, error) { + return NewLinuxSender(ctx, iface, localAddr, remoteAddr, signer, remotePubkey, challenged) } diff --git a/tools/twamp/pkg/signed/sender_linux.go b/tools/twamp/pkg/signed/sender_linux.go index 6c617a1cf5..454b85c46c 100644 --- a/tools/twamp/pkg/signed/sender_linux.go +++ b/tools/twamp/pkg/signed/sender_linux.go @@ -33,6 +33,7 @@ type LinuxSender struct { oob []byte mu sync.Mutex logger *slog.Logger + challenged bool } func (s *LinuxSender) SetLogger(logger *slog.Logger) { @@ -40,7 +41,8 @@ func (s *LinuxSender) SetLogger(logger *slog.Logger) { } // NewLinuxSender creates a signed TWAMP sender. Only local.Port is used; any IP is ignored. -func NewLinuxSender(ctx context.Context, iface string, local *net.UDPAddr, remote *net.UDPAddr, signer Signer, remotePubkey [32]byte) (*LinuxSender, error) { +// When challenged is true, ProbePair uses the challenge-response flow. +func NewLinuxSender(ctx context.Context, iface string, local *net.UDPAddr, remote *net.UDPAddr, signer Signer, remotePubkey [32]byte, challenged bool) (*LinuxSender, error) { fd, err := unix.Socket(unix.AF_INET, unix.SOCK_DGRAM|unix.SOCK_NONBLOCK, unix.IPPROTO_UDP) if err != nil { return nil, fmt.Errorf("socket: %w", err) @@ -99,6 +101,7 @@ func NewLinuxSender(ctx context.Context, iface string, local *net.UDPAddr, remot cancel: cancel, buf: make([]byte, 1500), oob: make([]byte, 512), + challenged: challenged, } return s, nil @@ -121,12 +124,20 @@ func (s *LinuxSender) Probe(ctx context.Context) (time.Duration, *ReplyPacket, e return s.sendAndRecv(ctx, buf[:], probe, true, 0) } -// ProbePair sends two probes in quick succession. Both probe packets are +// ProbePair dispatches to the unchallenged or challenged path based on s.challenged. +func (s *LinuxSender) ProbePair(ctx context.Context) (ProbePairResult, error) { + if s.challenged { + return s.probePairChallenged(ctx) + } + return s.probePairUnchallenged(ctx) +} + +// probePairUnchallenged sends two probes in quick succession. Both probe packets are // pre-created and pre-signed before any network I/O. When reply 0 arrives, // probe 1 is fired immediately after a 4-byte seq check — before any // parsing — to minimize the target turnaround that inflates the reflector's // Tx-to-Rx measurement. -func (s *LinuxSender) ProbePair(ctx context.Context) (ProbePairResult, error) { +func (s *LinuxSender) probePairUnchallenged(ctx context.Context) (ProbePairResult, error) { runtime.LockOSThread() defer runtime.UnlockOSThread() @@ -299,6 +310,52 @@ func (s *LinuxSender) ProbePair(ctx context.Context) (ProbePairResult, error) { } } +// probePairChallenged sends Probe 0, fully parses Reply 0 to extract the +// nonce, builds and signs Probe 1 with the nonce in Sec/Frac, sends it, +// then awaits Reply 1. The added sender-side compute time between Reply 0 +// arrival and Probe 1 transmission inflates Reply 1.SinceLastRxNs by that +// amount; this is the documented trade-off (RFC16, Challenge-Response +// Inbound Probing). +func (s *LinuxSender) probePairChallenged(ctx context.Context) (ProbePairResult, error) { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + s.mu.Lock() + defer s.mu.Unlock() + + s.seq++ + probe0 := NewProbePacket(s.seq, s.signer) + var buf0 [ProbePacketSize]byte + if err := probe0.Marshal(buf0[:]); err != nil { + return ProbePairResult{}, fmt.Errorf("marshal probe 0: %w", err) + } + + rtt0, reply0, err := s.sendAndRecv(ctx, buf0[:], probe0, true, busyPollWindow) + if err != nil { + return ProbePairResult{}, fmt.Errorf("probe 0: %w", err) + } + + s.seq++ + nonce := reply0.SinceLastRxNs + probe1 := newChallengedProbePacket(s.seq, s.signer, nonce) + var buf1 [ProbePacketSize]byte + if err := probe1.Marshal(buf1[:]); err != nil { + return ProbePairResult{}, fmt.Errorf("marshal probe 1: %w", err) + } + + rtt1, reply1, err := s.sendAndRecv(ctx, buf1[:], probe1, true, busyPollWindow) + if err != nil { + return ProbePairResult{}, fmt.Errorf("probe 1: %w", err) + } + + return ProbePairResult{ + RTT0: rtt0, + RTT1: rtt1, + Reply0: reply0, + Reply1: reply1, + }, nil +} + // sendAndRecv sends a probe and waits for the matching reply. // Caller must hold s.mu and have called runtime.LockOSThread(). func (s *LinuxSender) sendAndRecv(ctx context.Context, probeBuf []byte, probe *ProbePacket, verify bool, busyPollDuration time.Duration) (time.Duration, *ReplyPacket, error) { diff --git a/tools/twamp/pkg/signed/sender_test.go b/tools/twamp/pkg/signed/sender_test.go index 04891219e3..df6fd50d3c 100644 --- a/tools/twamp/pkg/signed/sender_test.go +++ b/tools/twamp/pkg/signed/sender_test.go @@ -44,7 +44,7 @@ func TestSender_Linux(t *testing.T) { time.Sleep(10 * time.Millisecond) - sender, err := signed.NewLinuxSender(t.Context(), "", nil, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: int(reflector.Port())}, senderSigner, reflectorPubKey) + sender, err := signed.NewLinuxSender(t.Context(), "", nil, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: int(reflector.Port())}, senderSigner, reflectorPubKey, false) require.NoError(t, err) defer sender.Close() @@ -66,7 +66,7 @@ func TestSender_Linux(t *testing.T) { var remotePubkey [32]byte addr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 19999} - sender, err := signed.NewLinuxSender(t.Context(), "", nil, addr, senderSigner, remotePubkey) + sender, err := signed.NewLinuxSender(t.Context(), "", nil, addr, senderSigner, remotePubkey, false) require.NoError(t, err) defer sender.Close() @@ -101,7 +101,7 @@ func TestSender_Linux(t *testing.T) { time.Sleep(10 * time.Millisecond) - sender, err := signed.NewLinuxSender(t.Context(), "", nil, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: int(reflector.Port())}, senderSigner, reflectorPubKey) + sender, err := signed.NewLinuxSender(t.Context(), "", nil, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: int(reflector.Port())}, senderSigner, reflectorPubKey, false) require.NoError(t, err) defer sender.Close() @@ -140,7 +140,7 @@ func TestSender_Linux(t *testing.T) { time.Sleep(10 * time.Millisecond) - sender, err := signed.NewLinuxSender(t.Context(), "", nil, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: int(reflector.Port())}, senderSigner, reflectorPubKey) + sender, err := signed.NewLinuxSender(t.Context(), "", nil, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: int(reflector.Port())}, senderSigner, reflectorPubKey, false) require.NoError(t, err) defer sender.Close() @@ -159,6 +159,82 @@ func TestSender_Linux(t *testing.T) { assert.Greater(t, result.Reply1.SinceLastRxNs, uint64(0), "reply 1 should have non-zero SinceLastRxNs") }) + t.Run("ProbePair challenged mode", func(t *testing.T) { + t.Parallel() + + senderPub, senderSigner := newTestSigner(t) + reflectorPub, reflectorSigner := newTestSigner(t) + geoprobePub, _ := newTestSigner(t) + + var senderPubKey [32]byte + copy(senderPubKey[:], senderPub) + var reflectorPubKey [32]byte + copy(reflectorPubKey[:], reflectorPub) + var geoprobePubKey [32]byte + copy(geoprobePubKey[:], geoprobePub) + + reflector, err := signed.NewLinuxReflector("127.0.0.1:0", 100*time.Millisecond, reflectorSigner, geoprobePubKey, [][32]byte{senderPubKey}, 0) + require.NoError(t, err) + defer reflector.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go func() { + _ = reflector.Run(ctx) + }() + + time.Sleep(10 * time.Millisecond) + + sender, err := signed.NewLinuxSender(t.Context(), "", nil, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: int(reflector.Port())}, senderSigner, reflectorPubKey, true) + require.NoError(t, err) + defer sender.Close() + + result, err := sender.ProbePair(ctx) + require.NoError(t, err) + + assert.True(t, result.Reply1.Challenged, "expected Reply 1 to be flagged Challenged in challenged mode") + assert.NotEqual(t, uint64(0), result.Reply0.SinceLastRxNs, "expected non-zero Reply 0 SinceLastRxNs (nonce)") + assert.Greater(t, result.Reply1.SinceLastRxNs, uint64(0), "expected non-zero Reply 1 SinceLastRxNs (Tx-to-Rx interval)") + }) + + t.Run("ProbePair unchallenged mode is still unchallenged", func(t *testing.T) { + t.Parallel() + + senderPub, senderSigner := newTestSigner(t) + reflectorPub, reflectorSigner := newTestSigner(t) + geoprobePub, _ := newTestSigner(t) + + var senderPubKey [32]byte + copy(senderPubKey[:], senderPub) + var reflectorPubKey [32]byte + copy(reflectorPubKey[:], reflectorPub) + var geoprobePubKey [32]byte + copy(geoprobePubKey[:], geoprobePub) + + reflector, err := signed.NewLinuxReflector("127.0.0.1:0", 100*time.Millisecond, reflectorSigner, geoprobePubKey, [][32]byte{senderPubKey}, 0) + require.NoError(t, err) + defer reflector.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go func() { + _ = reflector.Run(ctx) + }() + + time.Sleep(10 * time.Millisecond) + + sender, err := signed.NewLinuxSender(t.Context(), "", nil, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: int(reflector.Port())}, senderSigner, reflectorPubKey, false) + require.NoError(t, err) + defer sender.Close() + + result, err := sender.ProbePair(ctx) + require.NoError(t, err) + + assert.False(t, result.Reply1.Challenged, "expected Reply 1 NOT to be flagged Challenged in unchallenged mode") + }) + t.Run("multiple sequential probes", func(t *testing.T) { t.Parallel() @@ -183,7 +259,7 @@ func TestSender_Linux(t *testing.T) { time.Sleep(10 * time.Millisecond) - sender, err := signed.NewLinuxSender(t.Context(), "", nil, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: int(reflector.Port())}, senderSigner, reflectorPubKey) + sender, err := signed.NewLinuxSender(t.Context(), "", nil, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: int(reflector.Port())}, senderSigner, reflectorPubKey, false) require.NoError(t, err) defer sender.Close() diff --git a/tools/twamp/pkg/signed/stub_fallback.go b/tools/twamp/pkg/signed/stub_fallback.go index 9705260583..6f20e86bb8 100644 --- a/tools/twamp/pkg/signed/stub_fallback.go +++ b/tools/twamp/pkg/signed/stub_fallback.go @@ -12,7 +12,7 @@ import ( var errPlatformNotSupported = errors.New("platform not supported") -func NewLinuxSender(_ context.Context, _ string, _, _ *net.UDPAddr, _ Signer, _ [32]byte) (Sender, error) { +func NewLinuxSender(_ context.Context, _ string, _, _ *net.UDPAddr, _ Signer, _ [32]byte, _ bool) (Sender, error) { return nil, errPlatformNotSupported } From 019735d5396bc7812127ea9e11e57744496370cf Mon Sep 17 00:00:00 2001 From: Ben Blier Date: Wed, 20 May 2026 14:56:32 -0400 Subject: [PATCH 2/5] telemetry/geoprobe-target-sender: add --challenged flag for nonce-echo inbound probing --- .../cmd/geoprobe-target-sender/main.go | 10 ++++- .../cmd/geoprobe-target-sender/main_test.go | 40 +++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/controlplane/telemetry/cmd/geoprobe-target-sender/main.go b/controlplane/telemetry/cmd/geoprobe-target-sender/main.go index f98b197283..a97322e997 100644 --- a/controlplane/telemetry/cmd/geoprobe-target-sender/main.go +++ b/controlplane/telemetry/cmd/geoprobe-target-sender/main.go @@ -47,6 +47,7 @@ var ( logFormat = flag.String("log-format", "text", "Log format: text or json") verbose = flag.Bool("verbose", false, "Enable debug logging") showVersion = flag.Bool("version", false, "Print version and exit") + challenged = flag.Bool("challenged", false, "Use challenge-response inbound probing (nonce echo). Adds sender-side compute time between Reply 0 and Probe 1; only set when the geoprobe agent supports RFC16 challenged inbound. Default: off (preserves the fast pre-signed flow).") version = "dev" commit = "none" @@ -77,6 +78,7 @@ func main() { "timeout", *timeout, "count", *count, "max_measurement_age", *maxMeasurementAge, + "challenged", *challenged, ) cache := geoprobe.NewMinCache[measurement](*maxMeasurementAge, func(m measurement) uint64 { @@ -115,7 +117,7 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() - sender, err := signed.NewSender(ctx, "", localAddr, remoteAddr, signer, remotePubkey, false) + sender, err := signed.NewSender(ctx, "", localAddr, remoteAddr, signer, remotePubkey, *challenged) if err != nil { log.Error("failed to create sender", "error", err) os.Exit(1) @@ -190,7 +192,8 @@ func probePair(ctx context.Context, log *slog.Logger, sender signed.Sender, seq "reply0_probe_sig", reply0ProbeSigValid, "reply0_sig", reply0SigValid, "reply1_probe_sig", reply1ProbeSigValid, - "reply1_sig", reply1SigValid) + "reply1_sig", reply1SigValid, + "challenged", result.Reply1.Challenged) m := measurement{ probeMeasuredRttNs: probeMeasuredRttNs, @@ -235,6 +238,7 @@ func logPairedResult(log *slog.Logger, seq uint32, probeMeasuredRttNs uint64, ta MaxDistanceMiles: distMiles, MaxDistanceKm: distMiles * kmPerMile, SinceLastRxNs: reply.SinceLastRxNs, + Challenged: reply.Challenged, AuthorityPubkey: authorityPK.String(), GeoprobePubkey: geoprobePK.String(), Offsets: offsets, @@ -320,6 +324,7 @@ type probeOutput struct { MaxDistanceMiles float64 `json:"max_distance_miles,omitempty"` MaxDistanceKm float64 `json:"max_distance_km,omitempty"` SinceLastRxNs uint64 `json:"since_last_rx_ns,omitempty"` + Challenged bool `json:"challenged"` AuthorityPubkey string `json:"authority_pubkey,omitempty"` GeoprobePubkey string `json:"geoprobe_pubkey,omitempty"` Offsets []offsetOutput `json:"offsets,omitempty"` @@ -419,6 +424,7 @@ func formatTextResult(seq uint32, probeMeasuredRttNs uint64, targetMeasuredRtt t fmt.Fprintf(&sb, "[%s] Probe Pair #%d\n", time.Now().UTC().Format("2006-01-02 15:04:05 MST"), seq) fmt.Fprintf(&sb, " Probe-Measured RTT: %s\n", formatNsAsMs(probeMeasuredRttNs)) fmt.Fprintf(&sb, " Target-Measured RTT: %s\n", formatRTT(targetMeasuredRtt)) + fmt.Fprintf(&sb, " Challenged Inbound: %v\n", reply.Challenged) accumDistMiles := calculateMaxDistance(reply.RttNs) accumDistKm := accumDistMiles * kmPerMile fmt.Fprintf(&sb, " Reference Point: %s\n", formatCoordinate(reply.Lat, reply.Lng)) diff --git a/controlplane/telemetry/cmd/geoprobe-target-sender/main_test.go b/controlplane/telemetry/cmd/geoprobe-target-sender/main_test.go index df80341a5c..eeffa98ead 100644 --- a/controlplane/telemetry/cmd/geoprobe-target-sender/main_test.go +++ b/controlplane/telemetry/cmd/geoprobe-target-sender/main_test.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "crypto/ed25519" "encoding/json" "testing" @@ -193,6 +194,45 @@ func TestProbeOutput_SuccessJSON_OmitsError(t *testing.T) { } } +func TestProbeOutput_ChallengedFieldRoundTrip(t *testing.T) { + out := probeOutput{ + Timestamp: "2026-05-20T12:00:00Z", + Seq: 42, + Challenged: true, + } + data, err := json.Marshal(out) + if err != nil { + t.Fatalf("Marshal: %v", err) + } + var back probeOutput + if err := json.Unmarshal(data, &back); err != nil { + t.Fatalf("Unmarshal: %v", err) + } + if !back.Challenged { + t.Fatalf("Challenged field did not round-trip; got JSON: %s", data) + } + if !bytes.Contains(data, []byte(`"challenged":true`)) { + t.Fatalf("expected JSON to contain \"challenged\":true; got: %s", data) + } +} + +// Challenged is intentionally NOT omitempty: false is the default mode and +// downstream consumers must be able to distinguish "challenged off" from +// "field absent because the binary predates this PR". +func TestProbeOutput_ChallengedFalseStillSerialized(t *testing.T) { + out := probeOutput{ + Timestamp: "2026-05-20T12:00:00Z", + Seq: 43, + } + data, err := json.Marshal(out) + if err != nil { + t.Fatalf("Marshal: %v", err) + } + if !bytes.Contains(data, []byte(`"challenged":false`)) { + t.Fatalf("expected JSON to contain \"challenged\":false; got: %s", data) + } +} + func TestNewEd25519Signer_Integration(t *testing.T) { wallet := solana.NewWallet() signer := signed.NewEd25519Signer(ed25519.PrivateKey(wallet.PrivateKey)) From 63e70862f7edca8dc54d24f75efefef9eacbcbec Mon Sep 17 00:00:00 2001 From: Ben Blier Date: Wed, 20 May 2026 15:41:26 -0400 Subject: [PATCH 3/5] e2e/geoprobe: cover --challenged inbound path in TestE2E_GeoprobeDiscovery Reuses the existing devnet setup: target-sender runs once unchallenged then once with --challenged, asserting the JSON output for each shows the expected challenged flag. runTargetSender is now synchronous and takes a log path + challenged flag so the two runs don't overlap on shared sender state. Adds a verifyInterval knob on geoprobeAgentOpts (set to 1s for this test) so the reflector's default 29s rate-limit window doesn't drop the second run's probes. --- e2e/geoprobe_test.go | 80 +++++++++++++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 24 deletions(-) diff --git a/e2e/geoprobe_test.go b/e2e/geoprobe_test.go index fb279249c1..c4c1ca89c3 100644 --- a/e2e/geoprobe_test.go +++ b/e2e/geoprobe_test.go @@ -262,6 +262,9 @@ func TestE2E_GeoprobeDiscovery(t *testing.T) { dn.Manager.GeolocationProgramID, dn.Manager.ServiceabilityProgramID, &geoprobeAgentOpts{ probeInterval: 5 * time.Second, + // Short verify-interval so the two sequential target-sender runs + // (unchallenged then challenged) aren't rate-limited. + verifyInterval: 1 * time.Second, }) log.Debug("==> Geoprobe agent started", "pubkey", agent.agentPubkey) @@ -280,15 +283,27 @@ func TestE2E_GeoprobeDiscovery(t *testing.T) { verifyClickhouseOffsets(t, chContainerID) log.Debug("==> ClickHouse verification passed") - // --- Inbound flow --- + // --- Inbound flow (unchallenged) --- // Run target-sender from the target container to send signed TWAMP probes to the agent. // The agent should reply with cached DZD offsets embedded in signed replies. - log.Debug("==> Running target-sender for inbound probing") - runTargetSender(t, targetContainerID, geoprobeIPStr, agent.agentPubkey, senderKeypairPath) - - log.Debug("==> Waiting for successful inbound probe with offsets") - waitForInboundProbeSuccess(t, targetContainerID, 120*time.Second) - log.Debug("==> Inbound probing verified") + log.Debug("==> Running target-sender for inbound probing (unchallenged)") + const unchallengedLog = "/tmp/target-sender-unchallenged.log" + runTargetSender(t, targetContainerID, geoprobeIPStr, agent.agentPubkey, senderKeypairPath, unchallengedLog, false) + + log.Debug("==> Waiting for successful unchallenged inbound probe with offsets") + waitForInboundProbeSuccess(t, targetContainerID, unchallengedLog, false, 120*time.Second) + log.Debug("==> Unchallenged inbound probing verified") + + // --- Inbound flow (challenged) --- + // Re-run target-sender with --challenged so the sender echoes the Reply 0 nonce + // into Probe 1 (RFC16 challenge-response). The agent should flag Reply 1.Challenged=true. + log.Debug("==> Running target-sender for inbound probing (challenged)") + const challengedLog = "/tmp/target-sender-challenged.log" + runTargetSender(t, targetContainerID, geoprobeIPStr, agent.agentPubkey, senderKeypairPath, challengedLog, true) + + log.Debug("==> Waiting for successful challenged inbound probe with offsets") + waitForInboundProbeSuccess(t, targetContainerID, challengedLog, true, 120*time.Second) + log.Debug("==> Challenged inbound probing verified") } // getExchangePK retrieves the onchain PK for an exchange by its code. @@ -365,8 +380,9 @@ func addGeoprobeParent(t *testing.T, dn *devnet.Devnet, code, devicePK string) { } type geoprobeAgentOpts struct { - probeInterval time.Duration - capNetRaw bool // Add CAP_NET_RAW for ICMP probing. + probeInterval time.Duration + verifyInterval time.Duration // Rate-limit window for inbound probing (0 = agent default). + capNetRaw bool // Add CAP_NET_RAW for ICMP probing. } // geoprobeAgentResult holds the container ID and generated pubkeys from starting a geoprobe agent. @@ -406,6 +422,9 @@ func startGeoprobeAgent(t *testing.T, log *slog.Logger, dn *devnet.Devnet, cyoaI if opts != nil && opts.probeInterval > 0 { cmd = append(cmd, "-probe-interval", opts.probeInterval.String()) } + if opts != nil && opts.verifyInterval > 0 { + cmd = append(cmd, "-verify-interval", opts.verifyInterval.String()) + } req := testcontainers.ContainerRequest{ Image: geoprobeImage, @@ -624,14 +643,20 @@ func waitForTargetOffsetReceived(t *testing.T, containerID string, timeout time. }, timeout, 5*time.Second, "Expected geoprobe-target to log 'received LocationOffset' with 'signature_valid=true'") } -// runTargetSender starts geoprobe-target-sender in the target container, sending -// signed TWAMP probes to the agent's reflector. It runs with --count 3 so it -// exits after 3 probe pairs. -func runTargetSender(t *testing.T, containerID, agentIP, agentPubkey, keypairPath string) { +// runTargetSender runs geoprobe-target-sender in the target container to send 3 +// signed TWAMP probe pairs to the agent's reflector, blocking until the process +// exits. Writes JSON output to logPath; pass challenged=true to exercise the +// RFC16 nonce-echo path. +func runTargetSender(t *testing.T, containerID, agentIP, agentPubkey, keypairPath, logPath string, challenged bool) { t.Helper() + challengedFlag := "" + if challenged { + challengedFlag = "-challenged " + } + cmd := fmt.Sprintf( - "nohup doublezero-geoprobe-target-sender "+ + "doublezero-geoprobe-target-sender "+ "-probe-ip %s "+ "-probe-port 8924 "+ "-probe-pk %s "+ @@ -640,8 +665,9 @@ func runTargetSender(t *testing.T, containerID, agentIP, agentPubkey, keypairPat "-interval 5s "+ "-log-format json "+ "-verbose "+ - "> /tmp/target-sender.log 2>&1 &", - agentIP, agentPubkey, keypairPath, + "%s"+ + "> %s 2>&1", + agentIP, agentPubkey, keypairPath, challengedFlag, logPath, ) _, err := docker.Exec(t.Context(), dockerClient, containerID, []string{"bash", "-c", cmd}) require.NoError(t, err) @@ -653,10 +679,10 @@ func runTargetSender(t *testing.T, containerID, agentIP, agentPubkey, keypairPat ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() output, err := docker.Exec(ctx, dockerClient, containerID, []string{ - "bash", "-c", "cat /tmp/target-sender.log 2>/dev/null || echo 'no log file'", + "bash", "-c", fmt.Sprintf("cat %s 2>/dev/null || echo 'no log file'", logPath), }) if err == nil { - fmt.Fprintf(os.Stderr, "\n--- Target-sender log ---\n%s\n", string(output)) + fmt.Fprintf(os.Stderr, "\n--- Target-sender log (%s) ---\n%s\n", logPath, string(output)) } }) } @@ -1001,21 +1027,23 @@ func verifyClickhouseOffsets(t *testing.T, chContainerID string) { }, 60*time.Second, 5*time.Second, "Expected location_offsets to contain rows with valid signatures and non-zero rtt_ns") } -// waitForInboundProbeSuccess polls the target-sender log for a successful probe pair -// where reply signatures are valid and DZD offset data is present. -func waitForInboundProbeSuccess(t *testing.T, containerID string, timeout time.Duration) { +// waitForInboundProbeSuccess polls the target-sender log at logPath for a successful +// probe pair where reply signatures are valid, DZD offset data is present, and the +// challenged flag matches wantChallenged. +func waitForInboundProbeSuccess(t *testing.T, containerID, logPath string, wantChallenged bool, timeout time.Duration) { t.Helper() require.Eventually(t, func() bool { output, err := docker.Exec(t.Context(), dockerClient, containerID, []string{ - "bash", "-c", "cat /tmp/target-sender.log 2>/dev/null", + "bash", "-c", fmt.Sprintf("cat %s 2>/dev/null", logPath), }) if err != nil { return false } logStr := string(output) - // Look for a JSON line with valid reply signatures and non-empty offsets. + // Look for a JSON line with valid reply signatures, non-empty offsets, and + // the expected challenged flag. for _, line := range strings.Split(logStr, "\n") { line = strings.TrimSpace(line) if !strings.HasPrefix(line, "{") { @@ -1023,6 +1051,7 @@ func waitForInboundProbeSuccess(t *testing.T, containerID string, timeout time.D } var result struct { Reply1SigValid bool `json:"reply1_sig_valid"` + Challenged bool `json:"challenged"` Offsets []struct { SigValid bool `json:"sig_valid"` } `json:"offsets"` @@ -1034,12 +1063,15 @@ func waitForInboundProbeSuccess(t *testing.T, containerID string, timeout time.D if result.Error != "" || !result.Reply1SigValid { continue } + if result.Challenged != wantChallenged { + continue + } if len(result.Offsets) > 0 && result.Offsets[0].SigValid { return true } } return false - }, timeout, 5*time.Second, "Expected target-sender log to contain a successful probe pair with valid signatures and offsets") + }, timeout, 5*time.Second, "Expected target-sender log at %s to contain a successful probe pair with challenged=%v, valid signatures, and offsets", logPath, wantChallenged) } // TestE2E_GeoprobeResultDestination verifies that composite offsets are routed to a From 9f2949272fbf38ebbc5ce8d09c69b22f31a4a9a9 Mon Sep 17 00:00:00 2001 From: Ben Blier Date: Thu, 21 May 2026 08:53:29 -0400 Subject: [PATCH 4/5] changelog: add --challenged target-sender flag entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 429d6056b7..419b289332 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ All notable changes to this project will be documented in this file. - gnmi-writer now stores per-interface unicast/multicast/broadcast packet counters (in/out) and FCS errors in the `interface_state` table, bringing it closer to parity with the interface counters already collected in InfluxDB. These already stream in the gNMI interface-state subtree, so there is no added collection cost; existing `interface_state` rows read back `0` for the new columns - Remove the InfluxDB writer from global-monitor; ClickHouse is now the sole telemetry backend, dropping the `INFLUX_*` env var configuration - Signed-TWAMP reflector in `geoprobe-agent` issues a per-pair 8-byte challenge nonce in `Reply0.SinceLastRxNs` and flags `Reply1.NumOffsets` bit 7 (new `Challenged` field on `ReplyPacket`) when `Probe1.Sec || Frac` echoes the nonce — proves the sender received Reply 0 before sending Probe 1, closing the pre-emit-Probe-1 attack on `SinceLastRxNs`. Backwards-compatible: legacy senders never echo the nonce so the flag bit stays 0, and they always ignored Reply 0's previously-zero `SinceLastRxNs`. Documented in RFC16's new "Challenge-Response Inbound Probing" subsection ([#3737](https://github.com/malbeclabs/doublezero/pull/3737)) + - `geoprobe-target-sender` gains opt-in `--challenged` flag (default off). When set, the sender extracts the nonce from `Reply0.SinceLastRxNs`, writes it into `Probe1.Sec || Frac`, signs Probe 1 only after Reply 0 is parsed, and surfaces `Reply1.Challenged` on every per-pair log line (JSON `"challenged"`, text `Challenged Inbound:`). Default off preserves the existing pre-sign-both / fire-Probe-1-immediately fast path byte-for-byte. Trade-off: challenged mode inflates `Reply1.SinceLastRxNs` by the sender's Probe 1 signing latency ([#3738](https://github.com/malbeclabs/doublezero/pull/3738)) - CLI - Drop the activator-only pollers from `doublezero` (user and multicastgroup activation waits). The `--wait` flag on `user create`, `user create-subscribe`, `user subscribe`, `multicastgroup create`, and `multicastgroup update` now fetches the post-create state once instead of polling; creates are atomic to `Activated` post-RFC-11, so the wait loop was watching a transition that no longer happens ([#3614](https://github.com/malbeclabs/doublezero/issues/3614)) - `doublezero geolocation` `probe ...` and `user ...` mirrors `doublezero-geolocation` versions; new `--geo-program-id` global flag, `config get/set` include Geolocation Program ID; new `-init-geolocation-config` for init of geolocation program From 56dd529854afc646eaacaec3b5916c9c4999d67b Mon Sep 17 00:00:00 2001 From: Ben Blier Date: Thu, 21 May 2026 16:36:12 -0400 Subject: [PATCH 5/5] tools/twamp/signed: align challenged Reply 0 verification with unchallenged path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit probePairUnchallenged returns Reply 0 to the caller regardless of signature validity; the caller's .Verify() then surfaces sig failures as reply0_sig_valid=false in the per-pair log. probePairChallenged was passing verify=true on the same call, so a bad-sig Reply 0 was silently dropped and the operator saw a generic timeout instead. Switch to verify=false for Reply 0 to match. Reply 1 stays verify=true to match the unchallenged path's Reply 1 (both silently drop bad-sig Reply 1). A spoofed Reply 0 just produces a bogus nonce; the legitimate reflector won't recognize it, so Reply 1 either doesn't arrive or arrives without Challenged=true — the security model holds. --- tools/twamp/pkg/signed/sender_linux.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tools/twamp/pkg/signed/sender_linux.go b/tools/twamp/pkg/signed/sender_linux.go index 454b85c46c..2c4a49bd5f 100644 --- a/tools/twamp/pkg/signed/sender_linux.go +++ b/tools/twamp/pkg/signed/sender_linux.go @@ -330,7 +330,13 @@ func (s *LinuxSender) probePairChallenged(ctx context.Context) (ProbePairResult, return ProbePairResult{}, fmt.Errorf("marshal probe 0: %w", err) } - rtt0, reply0, err := s.sendAndRecv(ctx, buf0[:], probe0, true, busyPollWindow) + // Reply 0: verify=false to match probePairUnchallenged, which returns Reply 0 + // to the caller regardless of signature validity. The caller's .Verify() then + // surfaces sig failures as reply0_sig_valid=false in the per-pair log, + // distinguishing "bogus reply" from "no reply at all". A spoofed Reply 0 just + // produces a bogus nonce — the legitimate reflector will never recognize it, + // so Reply 1 either won't arrive or won't be flagged Challenged. + rtt0, reply0, err := s.sendAndRecv(ctx, buf0[:], probe0, false, busyPollWindow) if err != nil { return ProbePairResult{}, fmt.Errorf("probe 0: %w", err) } @@ -343,6 +349,8 @@ func (s *LinuxSender) probePairChallenged(ctx context.Context) (ProbePairResult, return ProbePairResult{}, fmt.Errorf("marshal probe 1: %w", err) } + // Reply 1: verify=true matches probePairUnchallenged's Reply 1. Bad-sig Reply 1 + // is silently dropped and the caller sees a timeout — symmetric across paths. rtt1, reply1, err := s.sendAndRecv(ctx, buf1[:], probe1, true, busyPollWindow) if err != nil { return ProbePairResult{}, fmt.Errorf("probe 1: %w", err)