forked from XTLS/Xray-core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
burstobserver.go
108 lines (93 loc) · 2.71 KB
/
burstobserver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package burst
import (
"context"
"github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/app/observatory"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/signal/done"
"github.com/xtls/xray-core/features/extension"
"github.com/xtls/xray-core/features/outbound"
"google.golang.org/protobuf/proto"
"sync"
)
type Observer struct {
config *Config
ctx context.Context
statusLock sync.Mutex
hp *HealthPing
finished *done.Instance
ohm outbound.Manager
}
func (o *Observer) GetObservation(ctx context.Context) (proto.Message, error) {
return &observatory.ObservationResult{Status: o.createResult()}, nil
}
func (o *Observer) createResult() []*observatory.OutboundStatus {
var result []*observatory.OutboundStatus
o.hp.access.Lock()
defer o.hp.access.Unlock()
for name, value := range o.hp.Results {
status := observatory.OutboundStatus{
Alive: value.getStatistics().All != value.getStatistics().Fail,
Delay: value.getStatistics().Average.Milliseconds(),
LastErrorReason: "",
OutboundTag: name,
LastSeenTime: 0,
LastTryTime: 0,
HealthPing: &observatory.HealthPingMeasurementResult{
All: int64(value.getStatistics().All),
Fail: int64(value.getStatistics().Fail),
Deviation: int64(value.getStatistics().Deviation),
Average: int64(value.getStatistics().Average),
Max: int64(value.getStatistics().Max),
Min: int64(value.getStatistics().Min),
},
}
result = append(result, &status)
}
return result
}
func (o *Observer) Type() interface{} {
return extension.ObservatoryType()
}
func (o *Observer) Start() error {
if o.config != nil && len(o.config.SubjectSelector) != 0 {
o.finished = done.New()
o.hp.StartScheduler(func() ([]string, error) {
hs, ok := o.ohm.(outbound.HandlerSelector)
if !ok {
return nil, newError("outbound.Manager is not a HandlerSelector")
}
outbounds := hs.Select(o.config.SubjectSelector)
return outbounds, nil
})
}
return nil
}
func (o *Observer) Close() error {
if o.finished != nil {
o.hp.StopScheduler()
return o.finished.Close()
}
return nil
}
func New(ctx context.Context, config *Config) (*Observer, error) {
var outboundManager outbound.Manager
err := core.RequireFeatures(ctx, func(om outbound.Manager) {
outboundManager = om
})
if err != nil {
return nil, newError("Cannot get depended features").Base(err)
}
hp := NewHealthPing(ctx, config.PingConfig)
return &Observer{
config: config,
ctx: ctx,
ohm: outboundManager,
hp: hp,
}, nil
}
func init() {
common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
return New(ctx, config.(*Config))
}))
}