Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ All notable changes to this project will be documented in this file.
- gnmi-writer now stores per-interface unicast/multicast/broadcast packet counters (in/out) and FCS errors in the `interface_state` table, bringing it closer to parity with the interface counters already collected in InfluxDB. These already stream in the gNMI interface-state subtree, so there is no added collection cost; existing `interface_state` rows read back `0` for the new columns
- Remove the InfluxDB writer from global-monitor; ClickHouse is now the sole telemetry backend, dropping the `INFLUX_*` env var configuration
- Signed-TWAMP reflector in `geoprobe-agent` issues a per-pair 8-byte challenge nonce in `Reply0.SinceLastRxNs` and flags `Reply1.NumOffsets` bit 7 (new `Challenged` field on `ReplyPacket`) when `Probe1.Sec || Frac` echoes the nonce — proves the sender received Reply 0 before sending Probe 1, closing the pre-emit-Probe-1 attack on `SinceLastRxNs`. Backwards-compatible: legacy senders never echo the nonce so the flag bit stays 0, and they always ignored Reply 0's previously-zero `SinceLastRxNs`. Documented in RFC16's new "Challenge-Response Inbound Probing" subsection ([#3737](https://github.com/malbeclabs/doublezero/pull/3737))
- `geoprobe-target-sender` gains opt-in `--challenged` flag (default off). When set, the sender extracts the nonce from `Reply0.SinceLastRxNs`, writes it into `Probe1.Sec || Frac`, signs Probe 1 only after Reply 0 is parsed, and surfaces `Reply1.Challenged` on every per-pair log line (JSON `"challenged"`, text `Challenged Inbound:`). Default off preserves the existing pre-sign-both / fire-Probe-1-immediately fast path byte-for-byte. Trade-off: challenged mode inflates `Reply1.SinceLastRxNs` by the sender's Probe 1 signing latency ([#3738](https://github.com/malbeclabs/doublezero/pull/3738))
- CLI
- Drop the activator-only pollers from `doublezero` (user and multicastgroup activation waits). The `--wait` flag on `user create`, `user create-subscribe`, `user subscribe`, `multicastgroup create`, and `multicastgroup update` now fetches the post-create state once instead of polling; creates are atomic to `Activated` post-RFC-11, so the wait loop was watching a transition that no longer happens ([#3614](https://github.com/malbeclabs/doublezero/issues/3614))
- `doublezero geolocation` `probe ...` and `user ...` mirrors `doublezero-geolocation` versions; new `--geo-program-id` global flag, `config get/set` include Geolocation Program ID; new `-init-geolocation-config` for init of geolocation program
Expand Down
10 changes: 8 additions & 2 deletions controlplane/telemetry/cmd/geoprobe-target-sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
logFormat = flag.String("log-format", "text", "Log format: text or json")
verbose = flag.Bool("verbose", false, "Enable debug logging")
showVersion = flag.Bool("version", false, "Print version and exit")
challenged = flag.Bool("challenged", false, "Use challenge-response inbound probing (nonce echo). Adds sender-side compute time between Reply 0 and Probe 1; only set when the geoprobe agent supports RFC16 challenged inbound. Default: off (preserves the fast pre-signed flow).")

version = "dev"
commit = "none"
Expand Down Expand Up @@ -77,6 +78,7 @@ func main() {
"timeout", *timeout,
"count", *count,
"max_measurement_age", *maxMeasurementAge,
"challenged", *challenged,
)

cache := geoprobe.NewMinCache[measurement](*maxMeasurementAge, func(m measurement) uint64 {
Expand Down Expand Up @@ -115,7 +117,7 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

sender, err := signed.NewSender(ctx, "", localAddr, remoteAddr, signer, remotePubkey)
sender, err := signed.NewSender(ctx, "", localAddr, remoteAddr, signer, remotePubkey, *challenged)
if err != nil {
log.Error("failed to create sender", "error", err)
os.Exit(1)
Expand Down Expand Up @@ -190,7 +192,8 @@ func probePair(ctx context.Context, log *slog.Logger, sender signed.Sender, seq
"reply0_probe_sig", reply0ProbeSigValid,
"reply0_sig", reply0SigValid,
"reply1_probe_sig", reply1ProbeSigValid,
"reply1_sig", reply1SigValid)
"reply1_sig", reply1SigValid,
"challenged", result.Reply1.Challenged)

m := measurement{
probeMeasuredRttNs: probeMeasuredRttNs,
Expand Down Expand Up @@ -235,6 +238,7 @@ func logPairedResult(log *slog.Logger, seq uint32, probeMeasuredRttNs uint64, ta
MaxDistanceMiles: distMiles,
MaxDistanceKm: distMiles * kmPerMile,
SinceLastRxNs: reply.SinceLastRxNs,
Challenged: reply.Challenged,
AuthorityPubkey: authorityPK.String(),
GeoprobePubkey: geoprobePK.String(),
Offsets: offsets,
Expand Down Expand Up @@ -320,6 +324,7 @@ type probeOutput struct {
MaxDistanceMiles float64 `json:"max_distance_miles,omitempty"`
MaxDistanceKm float64 `json:"max_distance_km,omitempty"`
SinceLastRxNs uint64 `json:"since_last_rx_ns,omitempty"`
Challenged bool `json:"challenged"`
AuthorityPubkey string `json:"authority_pubkey,omitempty"`
GeoprobePubkey string `json:"geoprobe_pubkey,omitempty"`
Offsets []offsetOutput `json:"offsets,omitempty"`
Expand Down Expand Up @@ -419,6 +424,7 @@ func formatTextResult(seq uint32, probeMeasuredRttNs uint64, targetMeasuredRtt t
fmt.Fprintf(&sb, "[%s] Probe Pair #%d\n", time.Now().UTC().Format("2006-01-02 15:04:05 MST"), seq)
fmt.Fprintf(&sb, " Probe-Measured RTT: %s\n", formatNsAsMs(probeMeasuredRttNs))
fmt.Fprintf(&sb, " Target-Measured RTT: %s\n", formatRTT(targetMeasuredRtt))
fmt.Fprintf(&sb, " Challenged Inbound: %v\n", reply.Challenged)
accumDistMiles := calculateMaxDistance(reply.RttNs)
accumDistKm := accumDistMiles * kmPerMile
fmt.Fprintf(&sb, " Reference Point: %s\n", formatCoordinate(reply.Lat, reply.Lng))
Expand Down
40 changes: 40 additions & 0 deletions controlplane/telemetry/cmd/geoprobe-target-sender/main_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"crypto/ed25519"
"encoding/json"
"testing"
Expand Down Expand Up @@ -193,6 +194,45 @@ func TestProbeOutput_SuccessJSON_OmitsError(t *testing.T) {
}
}

func TestProbeOutput_ChallengedFieldRoundTrip(t *testing.T) {
out := probeOutput{
Timestamp: "2026-05-20T12:00:00Z",
Seq: 42,
Challenged: true,
}
data, err := json.Marshal(out)
if err != nil {
t.Fatalf("Marshal: %v", err)
}
var back probeOutput
if err := json.Unmarshal(data, &back); err != nil {
t.Fatalf("Unmarshal: %v", err)
}
if !back.Challenged {
t.Fatalf("Challenged field did not round-trip; got JSON: %s", data)
}
if !bytes.Contains(data, []byte(`"challenged":true`)) {
t.Fatalf("expected JSON to contain \"challenged\":true; got: %s", data)
}
}

// Challenged is intentionally NOT omitempty: false is the default mode and
// downstream consumers must be able to distinguish "challenged off" from
// "field absent because the binary predates this PR".
func TestProbeOutput_ChallengedFalseStillSerialized(t *testing.T) {
out := probeOutput{
Timestamp: "2026-05-20T12:00:00Z",
Seq: 43,
}
data, err := json.Marshal(out)
if err != nil {
t.Fatalf("Marshal: %v", err)
}
if !bytes.Contains(data, []byte(`"challenged":false`)) {
t.Fatalf("expected JSON to contain \"challenged\":false; got: %s", data)
}
}

func TestNewEd25519Signer_Integration(t *testing.T) {
wallet := solana.NewWallet()
signer := signed.NewEd25519Signer(ed25519.PrivateKey(wallet.PrivateKey))
Expand Down
80 changes: 56 additions & 24 deletions e2e/geoprobe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ func TestE2E_GeoprobeDiscovery(t *testing.T) {
dn.Manager.GeolocationProgramID, dn.Manager.ServiceabilityProgramID,
&geoprobeAgentOpts{
probeInterval: 5 * time.Second,
// Short verify-interval so the two sequential target-sender runs
// (unchallenged then challenged) aren't rate-limited.
verifyInterval: 1 * time.Second,
})
log.Debug("==> Geoprobe agent started", "pubkey", agent.agentPubkey)

Expand All @@ -280,15 +283,27 @@ func TestE2E_GeoprobeDiscovery(t *testing.T) {
verifyClickhouseOffsets(t, chContainerID)
log.Debug("==> ClickHouse verification passed")

// --- Inbound flow ---
// --- Inbound flow (unchallenged) ---
// Run target-sender from the target container to send signed TWAMP probes to the agent.
// The agent should reply with cached DZD offsets embedded in signed replies.
log.Debug("==> Running target-sender for inbound probing")
runTargetSender(t, targetContainerID, geoprobeIPStr, agent.agentPubkey, senderKeypairPath)

log.Debug("==> Waiting for successful inbound probe with offsets")
waitForInboundProbeSuccess(t, targetContainerID, 120*time.Second)
log.Debug("==> Inbound probing verified")
log.Debug("==> Running target-sender for inbound probing (unchallenged)")
const unchallengedLog = "/tmp/target-sender-unchallenged.log"
runTargetSender(t, targetContainerID, geoprobeIPStr, agent.agentPubkey, senderKeypairPath, unchallengedLog, false)

log.Debug("==> Waiting for successful unchallenged inbound probe with offsets")
waitForInboundProbeSuccess(t, targetContainerID, unchallengedLog, false, 120*time.Second)
log.Debug("==> Unchallenged inbound probing verified")

// --- Inbound flow (challenged) ---
// Re-run target-sender with --challenged so the sender echoes the Reply 0 nonce
// into Probe 1 (RFC16 challenge-response). The agent should flag Reply 1.Challenged=true.
log.Debug("==> Running target-sender for inbound probing (challenged)")
const challengedLog = "/tmp/target-sender-challenged.log"
runTargetSender(t, targetContainerID, geoprobeIPStr, agent.agentPubkey, senderKeypairPath, challengedLog, true)

log.Debug("==> Waiting for successful challenged inbound probe with offsets")
waitForInboundProbeSuccess(t, targetContainerID, challengedLog, true, 120*time.Second)
log.Debug("==> Challenged inbound probing verified")
}

// getExchangePK retrieves the onchain PK for an exchange by its code.
Expand Down Expand Up @@ -365,8 +380,9 @@ func addGeoprobeParent(t *testing.T, dn *devnet.Devnet, code, devicePK string) {
}

type geoprobeAgentOpts struct {
probeInterval time.Duration
capNetRaw bool // Add CAP_NET_RAW for ICMP probing.
probeInterval time.Duration
verifyInterval time.Duration // Rate-limit window for inbound probing (0 = agent default).
capNetRaw bool // Add CAP_NET_RAW for ICMP probing.
}

// geoprobeAgentResult holds the container ID and generated pubkeys from starting a geoprobe agent.
Expand Down Expand Up @@ -406,6 +422,9 @@ func startGeoprobeAgent(t *testing.T, log *slog.Logger, dn *devnet.Devnet, cyoaI
if opts != nil && opts.probeInterval > 0 {
cmd = append(cmd, "-probe-interval", opts.probeInterval.String())
}
if opts != nil && opts.verifyInterval > 0 {
cmd = append(cmd, "-verify-interval", opts.verifyInterval.String())
}

req := testcontainers.ContainerRequest{
Image: geoprobeImage,
Expand Down Expand Up @@ -624,14 +643,20 @@ func waitForTargetOffsetReceived(t *testing.T, containerID string, timeout time.
}, timeout, 5*time.Second, "Expected geoprobe-target to log 'received LocationOffset' with 'signature_valid=true'")
}

// runTargetSender starts geoprobe-target-sender in the target container, sending
// signed TWAMP probes to the agent's reflector. It runs with --count 3 so it
// exits after 3 probe pairs.
func runTargetSender(t *testing.T, containerID, agentIP, agentPubkey, keypairPath string) {
// runTargetSender runs geoprobe-target-sender in the target container to send 3
// signed TWAMP probe pairs to the agent's reflector, blocking until the process
// exits. Writes JSON output to logPath; pass challenged=true to exercise the
// RFC16 nonce-echo path.
func runTargetSender(t *testing.T, containerID, agentIP, agentPubkey, keypairPath, logPath string, challenged bool) {
t.Helper()

challengedFlag := ""
if challenged {
challengedFlag = "-challenged "
}

cmd := fmt.Sprintf(
"nohup doublezero-geoprobe-target-sender "+
"doublezero-geoprobe-target-sender "+
"-probe-ip %s "+
"-probe-port 8924 "+
"-probe-pk %s "+
Expand All @@ -640,8 +665,9 @@ func runTargetSender(t *testing.T, containerID, agentIP, agentPubkey, keypairPat
"-interval 5s "+
"-log-format json "+
"-verbose "+
"> /tmp/target-sender.log 2>&1 &",
agentIP, agentPubkey, keypairPath,
"%s"+
"> %s 2>&1",
agentIP, agentPubkey, keypairPath, challengedFlag, logPath,
)
_, err := docker.Exec(t.Context(), dockerClient, containerID, []string{"bash", "-c", cmd})
require.NoError(t, err)
Expand All @@ -653,10 +679,10 @@ func runTargetSender(t *testing.T, containerID, agentIP, agentPubkey, keypairPat
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
output, err := docker.Exec(ctx, dockerClient, containerID, []string{
"bash", "-c", "cat /tmp/target-sender.log 2>/dev/null || echo 'no log file'",
"bash", "-c", fmt.Sprintf("cat %s 2>/dev/null || echo 'no log file'", logPath),
})
if err == nil {
fmt.Fprintf(os.Stderr, "\n--- Target-sender log ---\n%s\n", string(output))
fmt.Fprintf(os.Stderr, "\n--- Target-sender log (%s) ---\n%s\n", logPath, string(output))
}
})
}
Expand Down Expand Up @@ -1001,28 +1027,31 @@ func verifyClickhouseOffsets(t *testing.T, chContainerID string) {
}, 60*time.Second, 5*time.Second, "Expected location_offsets to contain rows with valid signatures and non-zero rtt_ns")
}

// waitForInboundProbeSuccess polls the target-sender log for a successful probe pair
// where reply signatures are valid and DZD offset data is present.
func waitForInboundProbeSuccess(t *testing.T, containerID string, timeout time.Duration) {
// waitForInboundProbeSuccess polls the target-sender log at logPath for a successful
// probe pair where reply signatures are valid, DZD offset data is present, and the
// challenged flag matches wantChallenged.
func waitForInboundProbeSuccess(t *testing.T, containerID, logPath string, wantChallenged bool, timeout time.Duration) {
t.Helper()

require.Eventually(t, func() bool {
output, err := docker.Exec(t.Context(), dockerClient, containerID, []string{
"bash", "-c", "cat /tmp/target-sender.log 2>/dev/null",
"bash", "-c", fmt.Sprintf("cat %s 2>/dev/null", logPath),
})
if err != nil {
return false
}
logStr := string(output)

// Look for a JSON line with valid reply signatures and non-empty offsets.
// Look for a JSON line with valid reply signatures, non-empty offsets, and
// the expected challenged flag.
for _, line := range strings.Split(logStr, "\n") {
line = strings.TrimSpace(line)
if !strings.HasPrefix(line, "{") {
continue
}
var result struct {
Reply1SigValid bool `json:"reply1_sig_valid"`
Challenged bool `json:"challenged"`
Offsets []struct {
SigValid bool `json:"sig_valid"`
} `json:"offsets"`
Expand All @@ -1034,12 +1063,15 @@ func waitForInboundProbeSuccess(t *testing.T, containerID string, timeout time.D
if result.Error != "" || !result.Reply1SigValid {
continue
}
if result.Challenged != wantChallenged {
continue
}
if len(result.Offsets) > 0 && result.Offsets[0].SigValid {
return true
}
}
return false
}, timeout, 5*time.Second, "Expected target-sender log to contain a successful probe pair with valid signatures and offsets")
}, timeout, 5*time.Second, "Expected target-sender log at %s to contain a successful probe pair with challenged=%v, valid signatures, and offsets", logPath, wantChallenged)
}

// TestE2E_GeoprobeResultDestination verifies that composite offsets are routed to a
Expand Down
24 changes: 24 additions & 0 deletions tools/twamp/pkg/signed/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,30 @@ func NewProbePacket(seq uint32, signer Signer) *ProbePacket {
return p
}

// newChallengedProbePacket constructs a probe whose Sec/Frac carry the
// challenge nonce (BigEndian: Sec = upper 4 bytes, Frac = lower 4 bytes)
// rather than an NTP timestamp. The signature is computed over the
// nonce-bearing payload. Used by senders running in challenged mode.
func newChallengedProbePacket(seq uint32, signer Signer, nonce uint64) *ProbePacket {
pub := signer.Public()
p := &ProbePacket{
Seq: seq,
Sec: uint32(nonce >> 32),
Frac: uint32(nonce & 0xFFFFFFFF),
}
copy(p.SenderPubkey[:], pub)

var payload [probePayloadSize]byte
binary.BigEndian.PutUint32(payload[0:4], p.Seq)
binary.BigEndian.PutUint32(payload[4:8], p.Sec)
binary.BigEndian.PutUint32(payload[8:12], p.Frac)
copy(payload[12:44], p.SenderPubkey[:])

sig := signer.Sign(payload[:])
copy(p.Signature[:], sig)
return p
}

func (p *ProbePacket) Marshal(buf []byte) error {
if len(buf) < ProbePacketSize {
return fmt.Errorf("buffer too small: %d < %d", len(buf), ProbePacketSize)
Expand Down
6 changes: 4 additions & 2 deletions tools/twamp/pkg/signed/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type Sender interface {
}

// NewSender creates a signed TWAMP sender. Only localAddr.Port is used; any IP is ignored.
func NewSender(ctx context.Context, iface string, localAddr, remoteAddr *net.UDPAddr, signer Signer, remotePubkey [32]byte) (Sender, error) {
return NewLinuxSender(ctx, iface, localAddr, remoteAddr, signer, remotePubkey)
// When challenged is true, ProbePair uses the challenge-response flow: Probe 1 is signed
// with the nonce echoed from Reply 0 rather than an NTP timestamp.
func NewSender(ctx context.Context, iface string, localAddr, remoteAddr *net.UDPAddr, signer Signer, remotePubkey [32]byte, challenged bool) (Sender, error) {
return NewLinuxSender(ctx, iface, localAddr, remoteAddr, signer, remotePubkey, challenged)
}
Loading
Loading