Skip to content

Commit 694c4fe

Browse files
authored
device/telemetry: use ledger epoch (#1004)
## Summary of Changes - Update the device telemetry agent to use epoch from the DZ ledger instead of wallclock epoch - Resolves #1002 - Resolves #1003 - The telemetry data/dashboard API was not updated to support this yet, that can happen separately ## Testing Verification - Existing tests updated to match the change
1 parent 92bf77e commit 694c4fe

22 files changed

+432
-149
lines changed

controlplane/telemetry/cmd/telemetry/main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,13 @@ func main() {
261261
TWAMPReflector: reflector,
262262
PeerDiscovery: peerDiscovery,
263263
TelemetryProgramClient: sdktelemetry.New(log, rpcClient, &keypair, telemetryProgramID),
264+
GetCurrentEpochFunc: func(ctx context.Context) (uint64, error) {
265+
epochInfo, err := rpcClient.GetEpochInfo(ctx, solanarpc.CommitmentFinalized)
266+
if err != nil {
267+
return 0, err
268+
}
269+
return epochInfo.Epoch, nil
270+
},
264271
})
265272
if err != nil {
266273
log.Error("failed to create telemetry collector", "error", err)

controlplane/telemetry/internal/data/latencies.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"sort"
88
"time"
99

10-
"github.com/malbeclabs/doublezero/controlplane/telemetry/pkg/data"
1110
"github.com/malbeclabs/doublezero/smartcontract/sdk/go/telemetry"
1211
)
1312

@@ -77,7 +76,7 @@ func (p *provider) GetCircuitLatenciesForEpoch(ctx context.Context, circuitCode
7776
samples := enrichSamplesWithTimestamps(account.Samples, account.StartTimestampMicroseconds, account.SamplingIntervalMicroseconds)
7877

7978
// If the epoch is sufficiently in the past, cache for much longer.
80-
currentEpoch := data.DeriveEpoch(time.Now())
79+
currentEpoch := DeriveEpoch(time.Now())
8180
ttl := p.cfg.CurrentEpochLatenciesCacheTTL
8281
if epoch < currentEpoch-1 {
8382
ttl = p.cfg.HistoricEpochLatenciesCacheTTL
@@ -88,8 +87,8 @@ func (p *provider) GetCircuitLatenciesForEpoch(ctx context.Context, circuitCode
8887
}
8988

9089
func (p *provider) GetCircuitLatencies(ctx context.Context, circuitCode string, from, to time.Time) ([]CircuitLatencySample, error) {
91-
startEpoch := data.DeriveEpoch(from)
92-
endEpoch := data.DeriveEpoch(to)
90+
startEpoch := DeriveEpoch(from)
91+
endEpoch := DeriveEpoch(to)
9392

9493
var latencies []CircuitLatencySample
9594

controlplane/telemetry/internal/data/latencies_test.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010

1111
"github.com/gagliardetto/solana-go"
1212
"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/data"
13-
datapkg "github.com/malbeclabs/doublezero/controlplane/telemetry/pkg/data"
1413
"github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability"
1514
"github.com/malbeclabs/doublezero/smartcontract/sdk/go/telemetry"
1615
"github.com/stretchr/testify/assert"
@@ -39,7 +38,7 @@ func TestTelemetry_Data_Provider_GetCircuitLatencies(t *testing.T) {
3938
}, defaultCircuit())
4039

4140
ctx := context.Background()
42-
epoch := datapkg.DeriveEpoch(time.Now())
41+
epoch := data.DeriveEpoch(time.Now())
4342

4443
first, err := provider.GetCircuitLatenciesForEpoch(ctx, "A → B (L1)", epoch)
4544
require.NoError(t, err)
@@ -57,7 +56,7 @@ func TestTelemetry_Data_Provider_GetCircuitLatencies(t *testing.T) {
5756
return nil, telemetry.ErrAccountNotFound
5857
}, defaultCircuit())
5958

60-
epoch := datapkg.DeriveEpoch(time.Now())
59+
epoch := data.DeriveEpoch(time.Now())
6160
latencies, err := provider.GetCircuitLatenciesForEpoch(context.Background(), "A → B (L1)", epoch)
6261
assert.ErrorIs(t, err, telemetry.ErrAccountNotFound)
6362
assert.Empty(t, latencies)
@@ -66,8 +65,7 @@ func TestTelemetry_Data_Provider_GetCircuitLatencies(t *testing.T) {
6665
t.Run("GetCircuitLatencies filters by time", func(t *testing.T) {
6766
t.Parallel()
6867

69-
now := time.Now()
70-
sampleTime := now.Add(-5 * time.Minute).UTC()
68+
sampleTime := time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC)
7169
sampleMicros := uint64(sampleTime.UnixMicro())
7270

7371
provider := newTestProvider(t, func(epoch uint64) (*telemetry.DeviceLatencySamples, error) {
@@ -80,8 +78,8 @@ func TestTelemetry_Data_Provider_GetCircuitLatencies(t *testing.T) {
8078
}, nil
8179
}, defaultCircuit())
8280

83-
from := now.Add(-10 * time.Minute)
84-
to := now
81+
from := time.Date(2023, 1, 1, 11, 55, 0, 0, time.UTC)
82+
to := time.Date(2023, 1, 1, 12, 5, 0, 0, time.UTC)
8583
latencies, err := provider.GetCircuitLatencies(context.Background(), "A → B (L1)", from, to)
8684
require.NoError(t, err)
8785
require.Len(t, latencies, 1)
@@ -116,7 +114,7 @@ func TestTelemetry_Data_Provider_GetCircuitLatencies(t *testing.T) {
116114
t.Run("Downsampled returns multiple buckets when maxPoints > 1", func(t *testing.T) {
117115
t.Parallel()
118116

119-
now := time.Now().UTC()
117+
now := time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC)
120118
tsMicros := uint64(now.UnixMicro())
121119

122120
provider := newTestProvider(t, func(epoch uint64) (*telemetry.DeviceLatencySamples, error) {
@@ -149,7 +147,7 @@ func TestTelemetry_Data_Provider_GetCircuitLatencies(t *testing.T) {
149147
}, nil
150148
}, defaultCircuit())
151149

152-
now := time.Now()
150+
now := time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC)
153151
stats, err := provider.GetCircuitLatenciesDownsampled(context.Background(), "A → B (L1)", now, now.Add(1*time.Minute), 1, data.UnitMicrosecond)
154152
require.NoError(t, err)
155153
assert.Len(t, stats, 0)
@@ -162,7 +160,7 @@ func TestTelemetry_Data_Provider_GetCircuitLatencies(t *testing.T) {
162160
return &telemetry.DeviceLatencySamples{}, nil
163161
}, defaultCircuit())
164162

165-
now := time.Now()
163+
now := time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC)
166164
stats, err := provider.GetCircuitLatenciesDownsampled(t.Context(), "A → B (L1)", now, now.Add(1*time.Second), 1, "invalid")
167165
require.Error(t, err)
168166
assert.Empty(t, stats)

controlplane/telemetry/internal/telemetry/collector.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,21 +46,27 @@ func New(log *slog.Logger, cfg Config) (*Collector, error) {
4646
buffer: buffer,
4747
}
4848

49-
c.submitter = NewSubmitter(log, &SubmitterConfig{
49+
var err error
50+
c.submitter, err = NewSubmitter(log, &SubmitterConfig{
5051
Interval: cfg.SubmissionInterval,
5152
Buffer: buffer,
5253
MetricsPublisherPK: cfg.MetricsPublisherPK,
5354
ProbeInterval: cfg.ProbeInterval,
5455
ProgramClient: cfg.TelemetryProgramClient,
56+
GetCurrentEpoch: cfg.GetCurrentEpochFunc,
5557
})
58+
if err != nil {
59+
return nil, fmt.Errorf("failed to create submitter: %w", err)
60+
}
5661

5762
c.pinger = NewPinger(log, &PingerConfig{
58-
LocalDevicePK: cfg.LocalDevicePK,
59-
Interval: cfg.ProbeInterval,
60-
ProbeTimeout: cfg.TWAMPSenderTimeout,
61-
Peers: cfg.PeerDiscovery,
62-
Buffer: buffer,
63-
GetSender: c.getOrCreateSender,
63+
LocalDevicePK: cfg.LocalDevicePK,
64+
Interval: cfg.ProbeInterval,
65+
ProbeTimeout: cfg.TWAMPSenderTimeout,
66+
Peers: cfg.PeerDiscovery,
67+
Buffer: buffer,
68+
GetSender: c.getOrCreateSender,
69+
GetCurrentEpoch: cfg.GetCurrentEpochFunc,
6470
})
6571

6672
return c, nil

controlplane/telemetry/internal/telemetry/collector_test.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,31 +75,31 @@ func TestAgentTelemetry_Collector(t *testing.T) {
7575
link2_1 := stringToPubkey("link2-1")
7676
link2_3 := stringToPubkey("link2-3")
7777

78-
ts := time.Now()
78+
epoch := uint64(100)
7979
originDevice1Link1_2Key := telemetry.AccountKey{
8080
OriginDevicePK: device1PK,
8181
TargetDevicePK: device2PK,
8282
LinkPK: link1_2,
83-
Epoch: telemetry.DeriveEpoch(ts),
83+
Epoch: epoch,
8484
}
8585
originDevice1Link1_3Key := telemetry.AccountKey{
8686
OriginDevicePK: device1PK,
8787
TargetDevicePK: device3PK,
8888
LinkPK: link1_3,
89-
Epoch: telemetry.DeriveEpoch(ts),
89+
Epoch: epoch,
9090
}
9191

9292
originDevice2Link2_1Key := telemetry.AccountKey{
9393
OriginDevicePK: device2PK,
9494
TargetDevicePK: device1PK,
9595
LinkPK: link2_1,
96-
Epoch: telemetry.DeriveEpoch(ts),
96+
Epoch: epoch,
9797
}
9898
originDevice2Link2_3Key := telemetry.AccountKey{
9999
OriginDevicePK: device2PK,
100100
TargetDevicePK: device3PK,
101101
LinkPK: link2_3,
102-
Epoch: telemetry.DeriveEpoch(ts),
102+
Epoch: epoch,
103103
}
104104

105105
telemetryProgram1 := newMemoryTelemetryProgramClient()
@@ -273,6 +273,9 @@ func TestAgentTelemetry_Collector(t *testing.T) {
273273
TWAMPReflector: reflector,
274274
PeerDiscovery: peerDiscovery,
275275
TelemetryProgramClient: telemetryProgram,
276+
GetCurrentEpochFunc: func(ctx context.Context) (uint64, error) {
277+
return 100, nil
278+
},
276279
})
277280
require.NoError(t, err)
278281

@@ -283,7 +286,7 @@ func TestAgentTelemetry_Collector(t *testing.T) {
283286
require.NoError(t, collector.Run(ctx))
284287
}()
285288

286-
epoch := telemetry.DeriveEpoch(time.Now())
289+
epoch := uint64(100)
287290

288291
accountKey := telemetry.AccountKey{
289292
OriginDevicePK: devicePK,
@@ -393,11 +396,13 @@ func TestAgentTelemetry_Collector(t *testing.T) {
393396
require.NoError(t, collector.Run(ctx))
394397
}()
395398

399+
epoch := uint64(100)
400+
396401
key := telemetry.AccountKey{
397402
OriginDevicePK: devicePK,
398403
TargetDevicePK: peerPK,
399404
LinkPK: linkPK,
400-
Epoch: telemetry.DeriveEpoch(time.Now()),
405+
Epoch: epoch,
401406
}
402407

403408
// Wait for multiple samples
@@ -434,6 +439,9 @@ func newTestCollector(t *testing.T, log *slog.Logger, localDevicePK solana.Publi
434439
TWAMPReflector: reflector,
435440
PeerDiscovery: peerDiscovery,
436441
TelemetryProgramClient: telemetryProgramClient,
442+
GetCurrentEpochFunc: func(ctx context.Context) (uint64, error) {
443+
return 100, nil
444+
},
437445
})
438446
require.NoError(t, err)
439447

controlplane/telemetry/internal/telemetry/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package telemetry
22

33
import (
4+
"context"
45
"errors"
56
"time"
67

@@ -15,6 +16,9 @@ type Config struct {
1516
// PeerDiscovery is the configured peer discovery implementation.
1617
PeerDiscovery PeerDiscovery
1718

19+
// GetCurrentEpochFunc is the function to get the current epoch.
20+
GetCurrentEpochFunc func(ctx context.Context) (uint64, error)
21+
1822
// TelemetryProgramClient is the client to the telemetry program.
1923
TelemetryProgramClient TelemetryProgramClient
2024

@@ -41,6 +45,9 @@ func (c *Config) Validate() error {
4145
if c.PeerDiscovery == nil {
4246
return errors.New("peer discovery is required")
4347
}
48+
if c.GetCurrentEpochFunc == nil {
49+
return errors.New("get current epoch is required")
50+
}
4451
if c.LocalDevicePK.IsZero() {
4552
return errors.New("local device pubkey is required")
4653
}

controlplane/telemetry/internal/telemetry/epoch.go

Lines changed: 0 additions & 10 deletions
This file was deleted.

controlplane/telemetry/internal/telemetry/epoch_test.go

Lines changed: 0 additions & 45 deletions
This file was deleted.

controlplane/telemetry/internal/telemetry/main_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (c *memoryTelemetryProgramClient) InitializeDeviceLatencySamples(ctx contex
111111
OriginDevicePK: config.OriginDevicePK,
112112
TargetDevicePK: config.TargetDevicePK,
113113
LinkPK: config.LinkPK,
114-
Epoch: config.Epoch,
114+
Epoch: *config.Epoch,
115115
}
116116

117117
c.accounts[accountKey] = make([]telemetry.Sample, 0)
@@ -127,7 +127,7 @@ func (c *memoryTelemetryProgramClient) WriteDeviceLatencySamples(ctx context.Con
127127
OriginDevicePK: config.OriginDevicePK,
128128
TargetDevicePK: config.TargetDevicePK,
129129
LinkPK: config.LinkPK,
130-
Epoch: config.Epoch,
130+
Epoch: *config.Epoch,
131131
}
132132

133133
if _, ok := c.accounts[accountKey]; !ok {

0 commit comments

Comments
 (0)