forked from smallnest/rpcx
-
Notifications
You must be signed in to change notification settings - Fork 0
/
jsonrpc2.go
238 lines (207 loc) ยท 6.35 KB
/
jsonrpc2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
package server
import (
"context"
"encoding/json"
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"
"github.com/rs/cors"
"github.com/ssdev-go/rpcx/protocol"
"github.com/ssdev-go/rpcx/share"
)
func (s *Server) jsonrpcHandler(w http.ResponseWriter, r *http.Request) {
data, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var req = &jsonrpcRequest{}
err = json.Unmarshal(data, req)
if err != nil {
var res = &jsonrpcRespone{}
res.Error = &JSONRPCError{
Code: CodeParseJSONRPCError,
Message: err.Error(),
}
writeResponse(w, res)
return
}
conn := r.Context().Value(HttpConnContextKey).(net.Conn)
ctx := share.WithValue(r.Context(), RemoteConnContextKey, conn)
if req.ID != nil {
res := s.handleJSONRPCRequest(ctx, req, r.Header)
writeResponse(w, res)
return
}
// notification
go s.handleJSONRPCRequest(ctx, req, r.Header)
}
func (s *Server) handleJSONRPCRequest(ctx context.Context, r *jsonrpcRequest, header http.Header) *jsonrpcRespone {
s.Plugins.DoPreReadRequest(ctx)
var res = &jsonrpcRespone{}
res.ID = r.ID
req := protocol.GetPooledMsg()
if req.Metadata == nil {
req.Metadata = make(map[string]string)
}
if r.ID == nil {
req.SetOneway(true)
}
req.SetMessageType(protocol.Request)
req.SetSerializeType(protocol.JSON)
lastDot := strings.LastIndex(r.Method, ".")
if lastDot <= 0 {
res.Error = &JSONRPCError{
Code: CodeMethodNotFound,
Message: "must contains servicepath and method",
}
return res
}
req.ServicePath = r.Method[:lastDot]
req.ServiceMethod = r.Method[lastDot+1:]
req.Payload = *r.Params
// meta
meta := header.Get(XMeta)
if meta != "" {
metadata, _ := url.ParseQuery(meta)
for k, v := range metadata {
if len(v) > 0 {
req.Metadata[k] = v[0]
}
}
}
auth := header.Get("Authorization")
if auth != "" {
req.Metadata[share.AuthKey] = auth
}
err := s.Plugins.DoPostReadRequest(ctx, req, nil)
if err != nil {
res.Error = &JSONRPCError{
Code: CodeInternalJSONRPCError,
Message: err.Error(),
}
return res
}
err = s.auth(ctx, req)
if err != nil {
s.Plugins.DoPreWriteResponse(ctx, req, nil, err)
res.Error = &JSONRPCError{
Code: CodeInternalJSONRPCError,
Message: err.Error(),
}
s.Plugins.DoPostWriteResponse(ctx, req, req.Clone(), err)
return res
}
resp, err := s.handleRequest(ctx, req)
if r.ID == nil {
return nil
}
s.Plugins.DoPreWriteResponse(ctx, req, nil, err)
if err != nil {
res.Error = &JSONRPCError{
Code: CodeInternalJSONRPCError,
Message: err.Error(),
}
s.Plugins.DoPostWriteResponse(ctx, req, req.Clone(), err)
return res
}
result := json.RawMessage(resp.Payload)
res.Result = &result
s.Plugins.DoPostWriteResponse(ctx, req, req.Clone(), err)
return res
}
func writeResponse(w http.ResponseWriter, res *jsonrpcRespone) {
data, err := json.Marshal(res)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Context-Type", "application/json")
w.Write(data)
}
type CORSOptions struct {
// AllowedOrigins is a list of origins a cross-domain request can be executed from.
// If the special "*" value is present in the list, all origins will be allowed.
// An origin may contain a wildcard (*) to replace 0 or more characters
// (i.e.: http://*.domain.com). Usage of wildcards implies a small performance penalty.
// Only one wildcard can be used per origin.
// Default value is ["*"]
AllowedOrigins []string
// AllowOriginFunc is a custom function to validate the origin. It take the origin
// as argument and returns true if allowed or false otherwise. If this option is
// set, the content of AllowedOrigins is ignored.
AllowOriginFunc func(origin string) bool
// AllowOriginFunc is a custom function to validate the origin. It takes the HTTP Request object and the origin as
// argument and returns true if allowed or false otherwise. If this option is set, the content of `AllowedOrigins`
// and `AllowOriginFunc` is ignored.
AllowOriginRequestFunc func(r *http.Request, origin string) bool
// AllowedMethods is a list of methods the client is allowed to use with
// cross-domain requests. Default value is simple methods (HEAD, GET and POST).
AllowedMethods []string
// AllowedHeaders is list of non simple headers the client is allowed to use with
// cross-domain requests.
// If the special "*" value is present in the list, all headers will be allowed.
// Default value is [] but "Origin" is always appended to the list.
AllowedHeaders []string
// ExposedHeaders indicates which headers are safe to expose to the API of a CORS
// API specification
ExposedHeaders []string
// MaxAge indicates how long (in seconds) the results of a preflight request
// can be cached
MaxAge int
// AllowCredentials indicates whether the request can include user credentials like
// cookies, HTTP authentication or client side SSL certificates.
AllowCredentials bool
// OptionsPassthrough instructs preflight to let other potential next handlers to
// process the OPTIONS method. Turn this on if your application handles OPTIONS.
OptionsPassthrough bool
// Debugging flag adds additional output to debug server side CORS issues
Debug bool
}
// AllowAllCORSOptions returns a option that allows access.
func AllowAllCORSOptions() *CORSOptions {
return &CORSOptions{
AllowedOrigins: []string{"*"},
AllowedMethods: []string{
http.MethodHead,
http.MethodGet,
http.MethodPost,
http.MethodPut,
http.MethodPatch,
http.MethodDelete,
},
AllowedHeaders: []string{"*"},
AllowCredentials: false,
}
}
// SetCORS sets CORS options.
// for example:
//
// cors.Options{
// AllowedOrigins: []string{"foo.com"},
// AllowedMethods: []string{http.MethodGet, http.MethodPost, http.MethodDelete},
// AllowCredentials: true,
// }
//
func (s *Server) SetCORS(options *CORSOptions) {
s.corsOptions = options
}
func (s *Server) startJSONRPC2(ln net.Listener) {
newServer := http.NewServeMux()
newServer.HandleFunc("/", s.jsonrpcHandler)
srv := http.Server{ConnContext: func(ctx context.Context, c net.Conn) context.Context {
return context.WithValue(ctx, HttpConnContextKey, c)
}}
if s.corsOptions != nil {
opt := cors.Options(*s.corsOptions)
c := cors.New(opt)
mux := c.Handler(newServer)
srv.Handler = mux
go srv.Serve(ln)
} else {
srv.Handler = newServer
go srv.Serve(ln)
}
}