Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 31 additions & 25 deletions controlplane/telemetry/cmd/geoprobe-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
solanarpc "github.com/gagliardetto/solana-go/rpc"
"github.com/malbeclabs/doublezero/config"
"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/geoprobe"
"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/metrics"
geolocation "github.com/malbeclabs/doublezero/sdk/geolocation/go"
twamplight "github.com/malbeclabs/doublezero/tools/twamp/pkg/light"
"github.com/malbeclabs/doublezero/tools/twamp/pkg/signed"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

Expand Down Expand Up @@ -337,9 +337,11 @@ func main() {
"geoprobe_pubkey", geoProbePubkey,
)

// Set up prometheus metrics server if enabled.
// Set up prometheus metrics.
m := geoprobe.NewMetrics(geoprobe.SourceGeoProbeAgent, geoProbePubkey.String(), prometheus.DefaultRegisterer)

if *metricsEnable {
metrics.GeoProbeBuildInfo.WithLabelValues(version, commit, date).Set(1)
m.BuildInfo.WithLabelValues(version, commit, date).Set(1)
go func() {
listener, err := net.Listen("tcp", *metricsAddr)
if err != nil {
Expand Down Expand Up @@ -491,7 +493,7 @@ func main() {

// Run UDP offset listener.
go func() {
runOffsetListener(ctx, log, offsetListener, cache, pState, signedReflector)
runOffsetListener(ctx, log, offsetListener, cache, pState, signedReflector, m)
}()

// Run eviction goroutine.
Expand Down Expand Up @@ -552,7 +554,7 @@ func main() {
return
case update := <-parentUpdateCh:
pState.update(update.Authorities)
metrics.GeoProbeParentsDiscovered.Set(float64(len(update.Authorities)))
m.ParentsDiscovered.Set(float64(len(update.Authorities)))
log.Info("Updated parent authorities from discovery",
"totalParents", len(update.Authorities))
}
Expand Down Expand Up @@ -585,12 +587,12 @@ func main() {
if pd != nil {
start := time.Now()
pd.Tick(ctx, parentUpdateCh)
metrics.GeoProbeParentDiscoveryDuration.Observe(time.Since(start).Seconds())
m.ParentDiscoveryDuration.Observe(time.Since(start).Seconds())
}
if td != nil {
start := time.Now()
td.Tick(ctx, targetUpdateCh, inboundKeyCh, icmpTargetUpdateCh)
metrics.GeoProbeTargetDiscoveryDuration.Observe(time.Since(start).Seconds())
m.TargetDiscoveryDuration.Observe(time.Since(start).Seconds())
}
}

Expand Down Expand Up @@ -621,6 +623,7 @@ func main() {
senderConn: senderConn,
getCurrentSlot: getCurrentSlot,
signedReflector: signedReflector,
metrics: m,
targetUpdateCh: targetUpdateCh,
icmpTargetUpdateCh: icmpTargetUpdateCh,
inboundKeyCh: inboundKeyCh,
Expand All @@ -647,6 +650,7 @@ func runOffsetListener(
cache *offsetCache,
parents *parentState,
signedReflector signed.Reflector,
m *geoprobe.Metrics,
) {
log.Info("Starting offset listener", "addr", conn.LocalAddr().String())

Expand All @@ -672,7 +676,7 @@ func runOffsetListener(
return
}
log.Warn("Failed to receive offset", "error", err)
metrics.GeoProbeErrors.WithLabelValues(metrics.GeoProbeErrorTypeOffsetReceive).Inc()
m.Errors.WithLabelValues(geoprobe.ErrorTypeOffsetReceive).Inc()
continue
}

Expand All @@ -685,7 +689,7 @@ func runOffsetListener(
expectedAuthority, knownParent := parents.getAuthority(offset.SenderPubkey)
if !knownParent {
log.Debug("Rejecting offset from unknown parent", "sender_pubkey", senderPK, "addr", addr)
metrics.GeoProbeOffsetsRejected.WithLabelValues(metrics.GeoProbeRejectUnknownParent).Inc()
m.OffsetsRejected.WithLabelValues(geoprobe.RejectUnknownParent).Inc()
continue
}
if expectedAuthority != offset.AuthorityPubkey {
Expand All @@ -694,22 +698,22 @@ func runOffsetListener(
"expected_authority", solana.PublicKeyFromBytes(expectedAuthority[:]).String(),
"actual_authority", authorityPK,
"addr", addr)
metrics.GeoProbeOffsetsRejected.WithLabelValues(metrics.GeoProbeRejectWrongAuthority).Inc()
m.OffsetsRejected.WithLabelValues(geoprobe.RejectWrongAuthority).Inc()
continue
}

// Verify signature chain (top-level and all references).
if err := geoprobe.VerifyOffsetChain(offset); err != nil {
log.Warn("Offset signature verification failed", "authority_pubkey", authorityPK, "addr", addr, "error", err)
metrics.GeoProbeOffsetsRejected.WithLabelValues(metrics.GeoProbeRejectInvalidSignature).Inc()
m.OffsetsRejected.WithLabelValues(geoprobe.RejectInvalidSignature).Inc()
continue
}

log.Debug("signature verification successful", "authority_pubkey", authorityPK)

cache.Put(offset)
signedReflector.SetOffsets(marshalBestOffset(cache))
metrics.GeoProbeOffsetsReceived.Inc()
m.OffsetsReceived.Inc()

log.Debug("Cached DZD offset",
"authority_pubkey", authorityPK,
Expand All @@ -734,6 +738,7 @@ type measurementLoop struct {
senderConn *net.UDPConn
getCurrentSlot func(ctx context.Context) (uint64, error)
signedReflector signed.Reflector
metrics *geoprobe.Metrics

targets []geoprobe.ProbeAddress
icmpTargets []geoprobe.ProbeAddress
Expand Down Expand Up @@ -810,10 +815,10 @@ func (ml *measurementLoop) run() error {
func(addr geoprobe.ProbeAddress) (uint64, bool) { return ml.pinger.MeasureOne(ml.ctx, addr) },
)
ml.targets = newTargets
metrics.GeoProbeTargetsDiscovered.Set(float64(len(ml.targets)))
ml.metrics.TargetsDiscovered.Set(float64(len(ml.targets)))
ml.log.Info("Updated targets from discovery", "totalTargets", len(ml.targets))
if len(rttData) > 0 {
sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot)
sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot, ml.metrics)
}

case icmpUpdate := <-ml.icmpTargetUpdateCh:
Expand All @@ -825,10 +830,10 @@ func (ml *measurementLoop) run() error {
func(addr geoprobe.ProbeAddress) (uint64, bool) { return ml.icmpPinger.MeasureOne(ml.ctx, addr) },
)
ml.icmpTargets = newTargets
metrics.GeoProbeIcmpTargetsDiscovered.Set(float64(len(ml.icmpTargets)))
ml.metrics.IcmpTargetsDiscovered.Set(float64(len(ml.icmpTargets)))
ml.log.Info("Updated ICMP targets from discovery", "totalIcmpTargets", len(ml.icmpTargets))
if len(rttData) > 0 {
sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot)
sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot, ml.metrics)
}

case keyUpdate := <-ml.inboundKeyCh:
Expand All @@ -848,7 +853,7 @@ func (ml *measurementLoop) runCycle() {
ml.log.Debug("Starting measurement cycle", "targets", len(ml.targets), "icmpTargets", len(ml.icmpTargets))
start := time.Now()
defer func() {
metrics.GeoProbeMeasurementCycleDuration.Observe(time.Since(start).Seconds())
ml.metrics.MeasurementCycleDuration.Observe(time.Since(start).Seconds())
}()

rttData := make(map[geoprobe.ProbeAddress]uint64)
Expand All @@ -857,7 +862,7 @@ func (ml *measurementLoop) runCycle() {
twampResults, err := ml.pinger.MeasureAll(ml.ctx)
if err != nil {
ml.log.Error("Failed to measure TWAMP targets", "error", err)
metrics.GeoProbeErrors.WithLabelValues(metrics.GeoProbeErrorTypeMeasurementCycle).Inc()
ml.metrics.Errors.WithLabelValues(geoprobe.ErrorTypeMeasurementCycle).Inc()
} else {
for k, v := range twampResults {
rttData[k] = v
Expand All @@ -868,10 +873,10 @@ func (ml *measurementLoop) runCycle() {
if len(ml.icmpTargets) > 0 {
icmpStart := time.Now()
icmpResults, err := ml.icmpPinger.MeasureAll(ml.ctx)
metrics.GeoProbeIcmpMeasurementCycleDuration.Observe(time.Since(icmpStart).Seconds())
ml.metrics.IcmpMeasurementCycleDuration.Observe(time.Since(icmpStart).Seconds())
if err != nil {
ml.log.Error("Failed to measure ICMP targets", "error", err)
metrics.GeoProbeErrors.WithLabelValues(metrics.GeoProbeErrorTypeIcmpMeasurementCycle).Inc()
ml.metrics.Errors.WithLabelValues(geoprobe.ErrorTypeIcmpMeasurementCycle).Inc()
} else {
for k, v := range icmpResults {
rttData[k] = v
Expand All @@ -888,7 +893,7 @@ func (ml *measurementLoop) runCycle() {
ml.log.Debug("target measurement result", "target", addr.Host, "rtt_ms", float64(rttNs)/1000000.0)
}

sent := sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot)
sent := sendCompositeOffsets(ml.ctx, ml.log, rttData, ml.cache, ml.signer, ml.senderConn, ml.getCurrentSlot, ml.metrics)

ml.log.Info("Completed measurement cycle",
"measured", len(rttData),
Expand All @@ -905,6 +910,7 @@ func sendCompositeOffsets(
signer *geoprobe.OffsetSigner,
senderConn *net.UDPConn,
getCurrentSlot func(ctx context.Context) (uint64, error),
m *geoprobe.Metrics,
) int {
dzdOffset := cache.GetBest()
if dzdOffset == nil {
Expand All @@ -915,7 +921,7 @@ func sendCompositeOffsets(
slot, err := getCurrentSlot(ctx)
if err != nil {
log.Error("Failed to get current slot", "error", err)
metrics.GeoProbeErrors.WithLabelValues(metrics.GeoProbeErrorTypeSlotFetch).Inc()
m.Errors.WithLabelValues(geoprobe.ErrorTypeSlotFetch).Inc()
return 0
}

Expand All @@ -937,19 +943,19 @@ func sendCompositeOffsets(

if err := signer.SignOffset(&compositeOffset); err != nil {
log.Error("Failed to sign composite offset", "target", addr, "error", err)
metrics.GeoProbeErrors.WithLabelValues(metrics.GeoProbeErrorTypeSignOffset).Inc()
m.Errors.WithLabelValues(geoprobe.ErrorTypeSignOffset).Inc()
continue
}

targetAddr := &net.UDPAddr{IP: net.ParseIP(addr.Host), Port: int(addr.Port)}
if err := geoprobe.SendOffset(senderConn, targetAddr, &compositeOffset); err != nil {
log.Error("Failed to send composite offset", "target", addr, "error", err)
metrics.GeoProbeErrors.WithLabelValues(metrics.GeoProbeErrorTypeSendOffset).Inc()
m.Errors.WithLabelValues(geoprobe.ErrorTypeSendOffset).Inc()
continue
}

sentCount++
metrics.GeoProbeCompositeOffsetsSent.Inc()
m.CompositeOffsetsSent.Inc()
log.Debug("Sent composite offset to target",
"target", addr,
"slot", slot,
Expand Down
Loading
Loading