/
chain.go
135 lines (124 loc) · 3.58 KB
/
chain.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package chain
import (
"context"
"fmt"
"strings"
"github.com/wzshiming/bridge"
"github.com/wzshiming/bridge/config"
"github.com/wzshiming/bridge/internal/scheme"
"github.com/wzshiming/schedialer"
"github.com/wzshiming/schedialer/plugins/probe"
"github.com/wzshiming/schedialer/plugins/random"
)
// BridgeChain is a bridger that supports multiple crossing of bridger.
type BridgeChain struct {
DialerFunc func(dialer bridge.Dialer) bridge.Dialer
proto map[string]bridge.Bridger
defaultProto bridge.Bridger
}
// NewBridgeChain create a new BridgeChain.
func NewBridgeChain() *BridgeChain {
return &BridgeChain{
proto: map[string]bridge.Bridger{},
DialerFunc: NewEnvDialer,
}
}
// BridgeChain is multiple crossing of bridge.
func (b *BridgeChain) BridgeChain(dialer bridge.Dialer, addresses ...string) (bridge.Dialer, error) {
if len(addresses) == 0 {
return dialer, nil
}
address := addresses[len(addresses)-1]
d, err := b.Dial(dialer, strings.Split(address, "|"), "")
if err != nil {
return nil, err
}
addresses = addresses[:len(addresses)-1]
if len(addresses) == 0 {
return d, nil
}
return b.BridgeChain(d, addresses...)
}
// BridgeChainWithConfig is multiple crossing of bridge.
func (b *BridgeChain) BridgeChainWithConfig(dialer bridge.Dialer, addresses ...config.Node) (bridge.Dialer, error) {
if len(addresses) == 0 {
return dialer, nil
}
d, err := b.bridgeChainWithConfig(dialer, addresses...)
if err != nil {
return nil, err
}
if b.DialerFunc != nil {
d = b.DialerFunc(d)
}
return d, nil
}
func (b *BridgeChain) bridgeChainWithConfig(dialer bridge.Dialer, addresses ...config.Node) (bridge.Dialer, error) {
if len(addresses) == 0 {
return dialer, nil
}
address := addresses[len(addresses)-1]
d, err := b.Dial(dialer, address.LB, address.Probe)
if err != nil {
return nil, err
}
addresses = addresses[:len(addresses)-1]
if len(addresses) == 0 {
return d, nil
}
return b.bridgeChainWithConfig(d, addresses...)
}
func (b *BridgeChain) Dial(dialer bridge.Dialer, addresses []string, probeUrl string) (bridge.Dialer, error) {
if len(addresses) == 1 {
return b.dialOne(dialer, addresses[0])
}
plugins := []schedialer.Plugin{
random.NewRandom(),
}
if probeUrl != "" {
plugins = append(plugins, probe.NewProbe(probeUrl))
}
return b.dialMulti(dialer, addresses, plugins)
}
func (b *BridgeChain) dialMulti(dialer bridge.Dialer, addresses []string, plugins []schedialer.Plugin) (bridge.Dialer, error) {
if len(addresses) == 1 {
return b.dialOne(dialer, addresses[0])
}
ctx := context.Background()
plugin := schedialer.NewPlugins(plugins...)
for _, address := range addresses {
dial, err := b.dialOne(dialer, address)
if err != nil {
return nil, err
}
proxy := schedialer.Proxy{
Name: address,
Dialer: dial,
}
plugin.AddProxy(ctx, &proxy)
}
return schedialer.NewSchedialer(plugin), nil
}
func (b *BridgeChain) dialOne(dialer bridge.Dialer, address string) (bridge.Dialer, error) {
sch, _, ok := scheme.SplitSchemeAddr(address)
if !ok {
return nil, fmt.Errorf("unsupported protocol format %q", address)
}
bridger, ok := b.proto[sch]
if !ok {
if b.defaultProto == nil {
return nil, fmt.Errorf("unsupported protocol %q", sch)
}
bridger = b.defaultProto
}
return bridger.Bridge(dialer, address)
}
// Register is register a new bridger for BridgeChain.
func (b *BridgeChain) Register(name string, bridger bridge.Bridger) error {
b.proto[name] = bridger
return nil
}
// RegisterDefault is register a default bridger for BridgeChain.
func (b *BridgeChain) RegisterDefault(bridger bridge.Bridger) {
b.defaultProto = bridger
}