/
grpc.go
235 lines (202 loc) · 5.18 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
// Package grpc transparently forwards the grpc protocol using a go-micro client.
package grpc
import (
"context"
"io"
"strings"
"github.com/micro/go-micro"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/server"
"github.com/micro/go-micro/service/grpc"
)
// Router will transparently proxy requests to the backend.
// If no backend is specified it will call a service using the client.
// If the service matches the Name it will use the server.DefaultRouter.
type Router struct {
// Name of the local service. In the event it's to be left alone
Name string
// Backend is a single backend to route to
// If backend is of the form address:port it will call the address.
// Otherwise it will use it as the service name to call.
Backend string
// Endpoint specified the fixed endpoint to call.
// In the event you proxy to a fixed backend this lets you
// call a single endpoint
Endpoint string
// The client to use for outbound requests
Client client.Client
}
var (
// The default name of this local service
DefaultName = "go.micro.proxy"
// The default router
DefaultRouter = &Router{}
)
// read client request and write to server
func readLoop(r server.Request, s client.Stream) error {
// request to backend server
req := s.Request()
for {
// get data from client
// no need to decode it
body, err := r.Read()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// get the header from client
hdr := r.Header()
msg := &codec.Message{
Type: codec.Request,
Header: hdr,
Body: body,
}
// write the raw request
err = req.Codec().Write(msg, nil)
if err == io.EOF {
return nil
} else if err != nil {
return err
}
}
}
// ServeRequest honours the server.Router interface
func (p *Router) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
// set the default name e.g local proxy
if p.Name == "" {
p.Name = DefaultName
}
// set default client
if p.Client == nil {
p.Client = client.DefaultClient
}
// check service route
if req.Service() == p.Name {
// use the default router
return server.DefaultRouter.ServeRequest(ctx, req, rsp)
}
opts := []client.CallOption{}
// service name
service := req.Service()
endpoint := req.Endpoint()
// call a specific backend
if len(p.Backend) > 0 {
// address:port
if parts := strings.Split(p.Backend, ":"); len(parts) > 0 {
opts = append(opts, client.WithAddress(p.Backend))
// use as service name
} else {
service = p.Backend
}
}
// call a specific endpoint
if len(p.Endpoint) > 0 {
endpoint = p.Endpoint
}
// read initial request
body, err := req.Read()
if err != nil {
return err
}
// create new request with raw bytes body
creq := p.Client.NewRequest(service, endpoint, &bytes.Frame{body}, client.WithContentType(req.ContentType()))
// create new stream
stream, err := p.Client.Stream(ctx, creq, opts...)
if err != nil {
return err
}
defer stream.Close()
// create client request read loop
go readLoop(req, stream)
// get raw response
resp := stream.Response()
// create server response write loop
for {
// read backend response body
body, err := resp.Read()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
// read backend response header
hdr := resp.Header()
// write raw response header to client
rsp.WriteHeader(hdr)
// write raw response body to client
err = rsp.Write(body)
if err == io.EOF {
return nil
} else if err != nil {
return err
}
}
return nil
}
// NewSingleHostRouter returns a router which sends requests to a single backend
//
// It is used by setting it in a new micro service to act as a proxy for a backend.
//
// Usage:
//
// Create a new router to the http backend
//
// r := NewSingleHostRouter("localhost:10001")
//
// // Create your new service
// service := micro.NewService(
// micro.Name("greeter"),
// // Set the router
// http.WithRouter(r),
// )
//
// // Run the service
// service.Run()
func NewSingleHostRouter(url string) *Router {
return &Router{
Backend: url,
}
}
// NewService returns a new proxy. It acts as a micro service proxy.
// Any request on the transport is routed to via the client to a service.
// In the event a backend is specified then it routes to that backend.
// The name of the backend can be a local address:port or a service name.
//
// Usage:
//
// New micro proxy routes via micro client to any service
//
// proxy := NewService()
//
// OR with address:port routes to local service
//
// service := NewService(
// // Sets the default http endpoint
// proxy.WithBackend("localhost:10001"),
// )
//
// OR with service name routes to a fixed backend service
//
// service := NewService(
// // Sets the backend service
// proxy.WithBackend("greeter"),
// )
//
func NewService(opts ...micro.Option) micro.Service {
router := DefaultRouter
name := DefaultName
// prepend router to opts
opts = append([]micro.Option{
micro.Name(name),
WithRouter(router),
}, opts...)
// create the new service
service := grpc.NewService(opts...)
// set router name
router.Name = service.Server().Options().Name
return service
}