-
Notifications
You must be signed in to change notification settings - Fork 0
/
gateway.go
144 lines (123 loc) · 3.22 KB
/
gateway.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package gateway
import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/wooyang2018/corechain/example/base"
"github.com/wooyang2018/corechain/example/pb"
"github.com/wooyang2018/corechain/logger"
"google.golang.org/grpc"
)
type Gateway struct {
scfg *base.ServConf
log logger.Logger
server *http.Server
isInit bool
exitOnce *sync.Once
}
func NewGateway(scfg *base.ServConf) (*Gateway, error) {
if scfg == nil {
return nil, fmt.Errorf("param error")
}
log, _ := logger.NewLogger("", base.SubModName)
obj := &Gateway{
scfg: scfg,
log: log,
isInit: true,
exitOnce: &sync.Once{},
}
return obj, nil
}
// 启动gateway服务
func (t *Gateway) Run() error {
if !t.isInit {
return errors.New("gateway not init")
}
// 启动gateway,阻塞直到退出
err := t.runGateway()
if err != nil {
t.log.Error("gateway abnormal exit.err:%v", err)
return err
}
t.log.Debug("gateway exit")
return nil
}
// 退出gateway服务,释放相关资源,需要幂等
func (t *Gateway) Exit() {
if !t.isInit {
return
}
t.exitOnce.Do(func() {
t.stopGateway()
})
}
func (t *Gateway) runGateway() error {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
mux := runtime.NewServeMux()
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithInitialWindowSize(t.scfg.InitWindowSize),
grpc.WithWriteBufferSize(t.scfg.WriteBufSize),
grpc.WithInitialConnWindowSize(t.scfg.InitConnWindowSize),
grpc.WithReadBufferSize(t.scfg.ReadBufSize),
}
rpcEndpoint := fmt.Sprintf(":%d", t.scfg.GWPort)
err := pb.RegisterMXchainHandlerFromEndpoint(ctx, mux, rpcEndpoint, opts)
if err != nil {
return err
}
if t.scfg.EnableEndorser {
err = pb.RegisterXendorserHandlerFromEndpoint(ctx, mux, rpcEndpoint, opts)
if err != nil {
return err
}
}
addr := fmt.Sprintf(":%d", t.scfg.GWPort)
t.server = &http.Server{
Addr: addr,
Handler: t.interupt(mux),
}
err = t.server.ListenAndServe()
if err != http.ErrServerClosed {
return err
}
return nil
}
func (t *Gateway) stopGateway() {
if t.server != nil {
t.server.Shutdown(context.Background())
}
}
// interupt
func (t *Gateway) interupt(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// allow CROS requests
// Note: CROS is kind of dangerous in production environment
// don't use this without consideration
if t.scfg.AdapterAllowCROS {
if origin := r.Header.Get("Origin"); origin != "" {
w.Header().Set("Access-Control-Allow-Origin", origin)
if r.Method == "OPTIONS" && r.Header.Get("Access-Control-Request-Method") != "" {
t.preflightHandler(w, r)
return
}
}
}
h.ServeHTTP(w, r)
// Request log
t.log.Debug("gateway access request", "ip", r.RemoteAddr, "method", r.Method, "url", r.URL.Path)
})
}
func (t *Gateway) preflightHandler(w http.ResponseWriter, r *http.Request) {
headers := []string{"Content-Type", "Accept"}
w.Header().Set("Access-Control-Allow-Headers", strings.Join(headers, ","))
methods := []string{"GET", "HEAD", "POST", "PUT", "DELETE"}
w.Header().Set("Access-Control-Allow-Methods", strings.Join(methods, ","))
return
}