-
Notifications
You must be signed in to change notification settings - Fork 19
/
blockexplorer.go
144 lines (118 loc) · 4.65 KB
/
blockexplorer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Copyright (C) 2023 Gobalsky Labs Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package blockexplorer
import (
"context"
"fmt"
"code.vegaprotocol.io/vega/blockexplorer/api"
ourGrpc "code.vegaprotocol.io/vega/blockexplorer/api/grpc"
"code.vegaprotocol.io/vega/blockexplorer/config"
"code.vegaprotocol.io/vega/blockexplorer/store"
"code.vegaprotocol.io/vega/libs/net/pipe"
"code.vegaprotocol.io/vega/logging"
pb "code.vegaprotocol.io/vega/protos/blockexplorer/api/v1"
"golang.org/x/sync/errgroup"
)
type BlockExplorer struct {
config config.Config
log *logging.Logger
store *store.Store
blockExplorerGrpcServer pb.BlockExplorerServiceServer
internalGRPCServer *ourGrpc.Server
externalGRPCServer *ourGrpc.Server
grpcPipeConn *pipe.Pipe
grpcUI *api.GRPCUIHandler
restAPI *api.RESTHandler
portal *api.Portal
gateway *api.Gateway
}
func NewFromConfig(config config.Config) *BlockExplorer {
a := &BlockExplorer{}
a.config = config
a.log = logging.NewLoggerFromConfig(config.Logging)
a.store = store.MustNewStore(config.Store, a.log)
// grpc-ui; a web front end that talks to the grpc api through a 'pipe' (fake connection)
a.grpcPipeConn = pipe.NewPipe("grpc-pipe")
a.grpcUI = api.NewGRPCUIHandler(a.log, a.grpcPipeConn, config.API.GRPCUI)
// a REST api that proxies to the GRPC api, generated by grpc-rest
// a.restApiConn = pipe.NewPipe("rest-api")
a.restAPI = api.NewRESTHandler(a.log, a.grpcPipeConn, config.API.REST)
// However GRPC is special, because it uses HTTP2 and really wants to be in control
// of its own connection. Fortunately there's a tool called cMux which creates dummy listeners
// and peeks at the stream to decide where to send it. If it's http/2 - send to grpc server
// otherwise dispatch to the gateway, which then sends it to which ever handler has registered
a.portal = api.NewPortal(config.API, a.log)
// The gateway collects all the HTTP handlers into a big 'serveMux'
a.gateway = api.NewGateway(a.log, config.API.Gateway, a.portal.GatewayListener())
a.gateway.Register(a.grpcUI, config.API.GRPCUI.Endpoint)
a.gateway.Register(a.restAPI, config.API.REST.Endpoint)
// main grpc api
a.blockExplorerGrpcServer = ourGrpc.NewBlockExplorerAPI(a.store, config.API.GRPC, a.log)
a.internalGRPCServer = ourGrpc.NewServer(config.API.GRPC, a.log, a.blockExplorerGrpcServer, a.grpcPipeConn)
a.externalGRPCServer = ourGrpc.NewServer(config.API.GRPC, a.log, a.blockExplorerGrpcServer, a.portal.GRPCListener())
return a
}
func (a *BlockExplorer) Run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
g, ctx := errgroup.WithContext(ctx)
// Two grpc services; one for internal connections using a fast pipe, and another for external
g.Go(func() error {
return a.internalGRPCServer.Serve()
})
g.Go(func() error {
return a.externalGRPCServer.Serve()
})
// Then start our gateway and portal servers
g.Go(func() error {
return a.gateway.Serve()
})
g.Go(func() error {
return a.portal.Serve()
})
g.Go(func() error { return a.store.Migrate(ctx) })
// Now we can do all the http 'handlers' that talk to the gateway
if err := a.grpcUI.Start(ctx); err != nil {
return fmt.Errorf("could not start grpc-ui: %w", err)
}
if err := a.restAPI.Start(ctx); err != nil {
return fmt.Errorf("could not start REST<>GRPC proxy: %w", err)
}
// If one of the errgroup.Go func return an error, or the parent context
// get cancelled, then we initiate the shutdown.
cleaningDone := make(chan any)
go func() {
<-ctx.Done()
a.stop()
close(cleaningDone)
}()
err := g.Wait()
// Ensure goroutine shutting down the block explorer is triggered to avoid
// a dead-lock.
cancel()
<-cleaningDone
return err
}
func (a *BlockExplorer) stop() {
a.log.Info("Shutting down block explorer")
a.externalGRPCServer.Stop()
a.internalGRPCServer.Stop()
a.gateway.Stop()
a.portal.Stop()
a.restAPI.Stop()
a.grpcUI.Stop()
a.store.Close()
a.log.Info("Resources released")
}