-
Notifications
You must be signed in to change notification settings - Fork 3
/
server.go
328 lines (268 loc) · 10.4 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
package indexer
import (
"context"
"encoding/json"
"fmt"
"runtime"
"time"
"github.com/avast/retry-go/v4"
"github.com/redis/rueidis"
"github.com/rss3-network/node/config"
"github.com/rss3-network/node/internal/constant"
"github.com/rss3-network/node/internal/database"
"github.com/rss3-network/node/internal/engine"
"github.com/rss3-network/node/internal/engine/source"
"github.com/rss3-network/node/internal/engine/worker"
"github.com/rss3-network/node/internal/node/monitor"
"github.com/rss3-network/node/internal/stream"
activityx "github.com/rss3-network/protocol-go/schema/activity"
"github.com/rss3-network/protocol-go/schema/network"
"github.com/samber/lo"
"github.com/sourcegraph/conc/pool"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
type Server struct {
id string
config *config.Module
source engine.Source
worker engine.Worker
databaseClient database.Client
streamClient stream.Client
monitorClient monitor.Client
redisClient rueidis.Client
// meterTasksCounter is a counter of the number of tasks processed.
// Deprecated: use meterTasksHistogram instead.
meterTasksCounter metric.Int64Counter
meterTasksHistogram metric.Float64Histogram
meterCurrentBlock metric.Int64ObservableGauge
meterLatestBlock metric.Int64ObservableGauge
}
func (s *Server) Run(ctx context.Context) error {
var (
// TODO Develop a more effective solution to implement back pressure.
tasksChan = make(chan *engine.Tasks)
errorChan = make(chan error)
)
zap.L().Info("start node", zap.String("version", constant.BuildVersion()))
s.source.Start(ctx, tasksChan, errorChan)
for {
select {
case tasks := <-tasksChan:
retryableFunc := func() error {
if err := s.handleTasks(ctx, tasks); err != nil {
return fmt.Errorf("handle tasks: %w", err)
}
return nil
}
err := retry.Do(retryableFunc,
retry.Attempts(0),
retry.Delay(time.Second), // Set initial delay to 1 second.
retry.DelayType(retry.BackOffDelay), // Use backoff delay type, increasing delay on each retry.
retry.MaxDelay(5*time.Minute),
retry.OnRetry(func(n uint, err error) {
zap.L().Error("retry handle tasks", zap.Uint("retry", n), zap.Error(err))
}),
)
if err != nil {
return fmt.Errorf("retry handle tasks: %w", err)
}
case err := <-errorChan:
if err != nil {
return fmt.Errorf("an error occurred in the source: %w", err)
}
return nil
}
}
}
func (s *Server) handleTasks(ctx context.Context, tasks *engine.Tasks) error {
// Initialize the attributes of the meter.
meterTasksCounterAttributes := metric.WithAttributes(
attribute.String("service", constant.Name),
attribute.String("worker", s.worker.Name()),
attribute.Int("tasks", tasks.Len()),
)
// Start a new timer to record the time it takes to handle tasks.
taskTimer := time.Now()
checkpoint := engine.Checkpoint{
ID: s.id,
Network: s.source.Network(),
Worker: s.worker.Name(),
State: s.source.State(),
}
// Extract the OpenTelemetry context from the tasks.
ctx = otel.GetTextMapPropagator().Extract(ctx, tasks)
ctx, span := otel.Tracer("").Start(ctx, "Indexer handleTasks", trace.WithSpanKind(trace.SpanKindConsumer))
defer span.End()
span.SetAttributes(
attribute.String("service", constant.Name),
attribute.String("worker", s.worker.Name()),
attribute.Int("tasks", tasks.Len()),
attribute.String("state", string(checkpoint.State)),
)
// If no tasks are returned, only save the checkpoint to the database.
if tasks.Len() == 0 {
zap.L().Info("save checkpoint", zap.Any("checkpoint", checkpoint))
if err := s.databaseClient.SaveCheckpoint(ctx, &checkpoint); err != nil {
return fmt.Errorf("save checkpoint: %w", err)
}
return nil
}
resultPool := pool.NewWithResults[*activityx.Activity]().WithMaxGoroutines(lo.Ternary(tasks.Len() < 20*runtime.NumCPU(), tasks.Len(), 20*runtime.NumCPU()))
for _, task := range tasks.Tasks {
task := task
resultPool.Go(func() *activityx.Activity {
zap.L().Debug("start match task", zap.String("task.id", task.ID()))
matched, err := s.worker.Match(ctx, task)
if err != nil {
zap.L().Error("match task", zap.String("task.id", task.ID()), zap.Error(err))
return nil
}
// If the task does not meet the filter conditions, it will be discarded.
if !matched {
zap.L().Warn("unmatched task", zap.String("task.id", task.ID()))
return nil
}
zap.L().Debug("start transform task", zap.String("task.id", task.ID()))
activity, err := s.worker.Transform(ctx, task)
if err != nil {
zap.L().Error("transform task", zap.String("task.id", task.ID()), zap.Error(err))
return nil
}
return activity
})
}
// Filter failed activities.
activities := lo.Filter(resultPool.Wait(), func(activity *activityx.Activity, _ int) bool {
return activity != nil
})
// Deprecated: use meterTasksHistogram instead.
s.meterTasksCounter.Add(ctx, int64(tasks.Len()), meterTasksCounterAttributes)
checkpoint.IndexCount = int64(len(activities))
// Save activities and checkpoint to the database.
if err := s.databaseClient.SaveActivities(ctx, activities); err != nil {
return fmt.Errorf("save %d activities: %w", len(activities), err)
}
zap.L().Info("save checkpoint", zap.Any("checkpoint", checkpoint))
if err := s.databaseClient.SaveCheckpoint(ctx, &checkpoint); err != nil {
return fmt.Errorf("save checkpoint: %w", err)
}
// Record the time it takes to handle tasks.
duration := time.Since(taskTimer).Seconds()
s.meterTasksHistogram.Record(ctx, duration, meterTasksCounterAttributes)
// Push activities to the stream.
if s.streamClient != nil && len(activities) > 0 {
if err := s.streamClient.PushActivities(ctx, activities); err != nil {
return fmt.Errorf("publish %d activities: %w", len(activities), err)
}
}
return nil
}
func (s *Server) initializeMeter() (err error) {
// init meter
meter := otel.GetMeterProvider().Meter(constant.Name)
if s.meterTasksCounter, err = meter.Int64Counter("rss3_node_tasks"); err != nil {
return fmt.Errorf("create meter of tasks counter: %w", err)
}
if s.meterTasksHistogram, err = meter.Float64Histogram("rss3_node_task_handle_duration_seconds", metric.WithUnit("s")); err != nil {
return fmt.Errorf("create meter of tasks histogram: %w", err)
}
if s.meterCurrentBlock, err = meter.Int64ObservableGauge("rss3_node_current_block", metric.WithInt64Callback(s.currentBlockMetricHandler)); err != nil {
return fmt.Errorf("failed to observe meter CurrentBlock: %w", err)
}
if s.meterLatestBlock, err = meter.Int64ObservableGauge("rss3_node_latest_block", metric.WithInt64Callback(s.latestBlockMetricHandler)); err != nil {
return fmt.Errorf("failed to observe meter LatestBlock: %w", err)
}
return nil
}
// currentBlockMetricHandler gets the current block height/number from the checkpoint state and get latest block height/number from the network rpc.
func (s *Server) currentBlockMetricHandler(ctx context.Context, observer metric.Int64Observer) error {
go func() {
// get current block height state
latestCheckpoint, err := s.databaseClient.LoadCheckpoint(ctx, s.id, s.source.Network(), s.worker.Name())
if err != nil {
zap.L().Error("find latest checkpoint", zap.Error(err))
return
}
if latestCheckpoint != nil {
// Get the current block height/block number from the checkpoint state.
var state monitor.CheckpointState
if err := json.Unmarshal(latestCheckpoint.State, &state); err != nil {
zap.L().Error("unmarshal checkpoint state", zap.Error(err))
return
}
observer.Observe(int64(s.monitorClient.CurrentState(state)), metric.WithAttributes(
attribute.String("service", constant.Name),
attribute.String("worker", s.worker.Name()),
))
}
}()
return nil
}
// latestBlockMetricHandler gets the latest block height/number from the network rpc.
func (s *Server) latestBlockMetricHandler(ctx context.Context, observer metric.Int64Observer) error {
go func() {
// get latest block height
latestBlockHeight, err := s.monitorClient.LatestState(ctx)
if err != nil {
zap.L().Error("get latest block height", zap.Error(err))
return
}
observer.Observe(int64(latestBlockHeight), metric.WithAttributes(
attribute.String("service", constant.Name),
attribute.String("worker", s.worker.Name())))
}()
return nil
}
func NewServer(ctx context.Context, config *config.Module, databaseClient database.Client, streamClient stream.Client, redisClient rueidis.Client) (server *Server, err error) {
instance := Server{
id: config.ID,
config: config,
databaseClient: databaseClient,
streamClient: streamClient,
redisClient: redisClient,
}
// Initialize worker.
if instance.worker, err = worker.New(instance.config, databaseClient, instance.redisClient); err != nil {
return nil, fmt.Errorf("new worker: %w", err)
}
switch config.Network {
case network.Arweave:
instance.monitorClient, err = monitor.NewArweaveClient()
if err != nil {
return nil, fmt.Errorf("new arweave monitorClient: %w", err)
}
case network.Farcaster:
instance.monitorClient, err = monitor.NewFarcasterClient()
if err != nil {
return nil, fmt.Errorf("new farcaster monitorClient: %w", err)
}
default:
instance.monitorClient, err = monitor.NewEthereumClient(config.Endpoint)
if err != nil {
return nil, fmt.Errorf("new ethereum monitorClient: %w", err)
}
}
if err := instance.initializeMeter(); err != nil {
return nil, fmt.Errorf("initialize meter: %w", err)
}
// Load checkpoint for initialize the source.
checkpoint, err := instance.databaseClient.LoadCheckpoint(ctx, instance.id, config.Network, instance.worker.Name())
if err != nil {
return nil, fmt.Errorf("loca checkpoint: %w", err)
}
// Unmarshal checkpoint state to map for print it in log.
var state map[string]any
if err := json.Unmarshal(checkpoint.State, &state); err != nil {
return nil, fmt.Errorf("unmarshal checkpoint state: %w", err)
}
zap.L().Info("load checkpoint", zap.String("checkpoint.id", checkpoint.ID), zap.String("checkpoint.network", checkpoint.Network.String()), zap.String("checkpoint.worker", checkpoint.Worker), zap.Any("checkpoint.state", state))
// Initialize source.
if instance.source, err = source.New(instance.config, instance.worker.Filter(), checkpoint, databaseClient); err != nil {
return nil, fmt.Errorf("new source: %w", err)
}
return &instance, nil
}