-
Notifications
You must be signed in to change notification settings - Fork 0
/
jsonrpcws_relayer.go
138 lines (125 loc) · 3.5 KB
/
jsonrpcws_relayer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package server
import (
"context"
"github.com/pkg/errors"
"github.com/superisaac/jsoff"
"github.com/superisaac/jsoff/net"
"github.com/superisaac/nodemux/core"
"net/http"
"net/url"
)
var (
wsPairs = make(map[string]*jsoffnet.WSClient)
)
// JSONRPC Handler
type JSONRPCWSRelayer struct {
rootCtx context.Context
acc *Acc
rpcHandler *jsoffnet.WSHandler
}
func NewJSONRPCWSRelayer(rootCtx context.Context) *JSONRPCWSRelayer {
relayer := &JSONRPCWSRelayer{
rootCtx: rootCtx,
}
rpcHandler := jsoffnet.NewWSHandler(rootCtx, nil)
rpcHandler.Actor.OnClose(func(s jsoffnet.RPCSession) {
relayer.onClose(s)
})
rpcHandler.Actor.OnMissing(func(req *jsoffnet.RPCRequest) (interface{}, error) {
r := req.HttpRequest()
acc := AccFromContext(r.Context())
accName := ""
var ratelimit RatelimitConfig
if acc != nil {
accName = acc.Config.Username
if accName == "" {
accName = acc.Name
}
ratelimit = acc.Config.Ratelimit
} else {
serverCfg := ServerConfigFromContext(rootCtx)
ratelimit = serverCfg.Ratelimit
}
ok, err := checkRatelimit(r, accName, ratelimit, true)
if err != nil {
return nil, err
} else if !ok {
return nil, jsoffnet.SimpleResponse{
Code: 429,
Body: []byte("rate limit exceeded!"),
}
}
return relayer.delegateRPC(req)
})
relayer.rpcHandler = rpcHandler
return relayer
}
func (self *JSONRPCWSRelayer) onClose(s jsoffnet.RPCSession) {
delete(wsPairs, s.SessionID())
metricsWSPairsCount.Set(float64(len(wsPairs)))
}
func (self *JSONRPCWSRelayer) delegateRPC(req *jsoffnet.RPCRequest) (interface{}, error) {
r := req.HttpRequest()
msg := req.Msg()
session := req.Session()
if session == nil {
return nil, errors.New("request data is not websocket conn")
}
acc := self.acc
if acc == nil {
acc = AccFromContext(r.Context())
if acc == nil {
return nil, jsoffnet.SimpleResponse{
Code: 404,
Body: []byte("acc not found"),
}
}
}
m := nodemuxcore.GetMultiplexer()
if destWs, ok := wsPairs[session.SessionID()]; ok {
// a existing dest ws conn found, relay the message to it
err := destWs.Send(self.rootCtx, msg)
return nil, err
} else if ep, found := m.SelectWebsocketEndpoint(acc.Chain, "", -2); found {
// the first time a websocket connection connects
// select an available dest websocket connection
// make a pair (session, destWs)
u, err := url.Parse(ep.Config.StreamingUrl)
if err != nil {
return nil, err
}
destWs := jsoffnet.NewWSClient(u)
destWs.OnMessage(func(m jsoff.Message) {
session.Send(m)
})
destWs.OnClose(func() {
self.onClose(session)
})
wsPairs[session.SessionID()] = destWs
metricsWSPairsCount.Set(float64(len(wsPairs)))
err = destWs.Send(self.rootCtx, msg)
return nil, err
} else if msg.IsRequest() {
// if no dest websocket connection is available and msg is a request message
// it's still ok to deliver the message to http endpoints
delegator := nodemuxcore.GetDelegatorFactory().GetRPCDelegator(acc.Chain.Namespace)
reqmsg, _ := msg.(*jsoff.RequestMessage)
if delegator == nil {
return nil, jsoffnet.SimpleResponse{
Code: 404,
Body: []byte("backend not found"),
}
}
resmsg, err := delegator.DelegateRPC(self.rootCtx, m, acc.Chain, reqmsg, r)
return resmsg, err
} else {
// the last way, return back
return nil, jsoffnet.SimpleResponse{
Code: 400,
Body: []byte("no websocket upstreams"),
}
}
}
func (self *JSONRPCWSRelayer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
self.rpcHandler.ServeHTTP(w, r)
} // JSONRPCWSRelayer.ServeHTTP