-
Notifications
You must be signed in to change notification settings - Fork 45
/
tier2.go
134 lines (106 loc) · 3.15 KB
/
tier2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package app
import (
"context"
"fmt"
"net/url"
dauth "github.com/streamingfast/dauth"
"github.com/streamingfast/dmetrics"
"github.com/streamingfast/shutter"
"github.com/streamingfast/substreams/metrics"
"github.com/streamingfast/substreams/pipeline"
"github.com/streamingfast/substreams/service"
"github.com/streamingfast/substreams/wasm"
"go.uber.org/atomic"
"go.uber.org/zap"
)
type Tier2Config struct {
GRPCListenAddr string // gRPC address where this app will listen to
ServiceDiscoveryURL *url.URL
PipelineOptions []pipeline.Option
MaximumConcurrentRequests uint64
WASMExtensions wasm.WASMExtensioner
Tracing bool
}
type Tier2App struct {
*shutter.Shutter
config *Tier2Config
modules *Tier2Modules
logger *zap.Logger
isReady *atomic.Bool
}
type Tier2Modules struct {
CheckPendingShutDown func() bool
}
func NewTier2(logger *zap.Logger, config *Tier2Config, modules *Tier2Modules) *Tier2App {
return &Tier2App{
Shutter: shutter.New(),
config: config,
modules: modules,
logger: logger,
isReady: atomic.NewBool(false),
}
}
func (a *Tier2App) Run() error {
dmetrics.Register(metrics.MetricSet)
a.logger.Info("running substreams-tier2", zap.Reflect("config", a.config))
if err := a.config.Validate(); err != nil {
return fmt.Errorf("invalid app config: %w", err)
}
var opts []service.Option
//for _, opt := range a.config.PipelineOptions {
// opts = append(opts, service.WithPipelineOptions(opt))
//}
if a.config.Tracing {
opts = append(opts, service.WithModuleExecutionTracing())
}
if a.config.MaximumConcurrentRequests > 0 {
opts = append(opts, service.WithMaxConcurrentRequests(a.config.MaximumConcurrentRequests))
}
opts = append(opts, service.WithReadinessFunc(a.setReadiness))
if a.config.WASMExtensions != nil {
opts = append(opts, service.WithWASMExtensioner(a.config.WASMExtensions))
}
svc, err := service.NewTier2(
a.logger,
opts...,
)
if err != nil {
return err
}
// tier2 always trusts the headers sent from tier1
trustAuth, err := dauth.New("trust://", a.logger)
if err != nil {
return fmt.Errorf("failed to setup trust authenticator: %w", err)
}
a.OnTerminating(func(_ error) { metrics.AppReadinessTier2.SetNotReady() })
go func() {
a.logger.Info("launching gRPC server")
a.isReady.CompareAndSwap(false, true)
metrics.AppReadinessTier2.SetReady()
err := service.ListenTier2(a.config.GRPCListenAddr, a.config.ServiceDiscoveryURL, svc, trustAuth, a.logger, a.HealthCheck)
a.Shutdown(err)
}()
return nil
}
func (a *Tier2App) HealthCheck(ctx context.Context) (bool, interface{}, error) {
return a.IsReady(ctx), nil, nil
}
// IsReady return `true` if the apps is ready to accept requests, `false` is returned
// otherwise.
func (a *Tier2App) IsReady(ctx context.Context) bool {
if a.IsTerminating() {
return false
}
if a.modules.CheckPendingShutDown != nil && a.modules.CheckPendingShutDown() {
return false
}
return a.isReady.Load()
}
func (a *Tier2App) setReadiness(ready bool) {
a.isReady.Store(ready)
}
// Validate inspects itself to determine if the current config is valid according to
// substreams rules.
func (config *Tier2Config) Validate() error {
return nil
}