-
Notifications
You must be signed in to change notification settings - Fork 8
/
app.go
320 lines (275 loc) · 8.33 KB
/
app.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
package main
import (
"context"
"net"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
"github.com/alecthomas/kong"
formatter "github.com/bluexlab/logrus-formatter"
otlp_util "github.com/bluexlab/otlp-util-go"
"github.com/gobuffalo/pop"
"github.com/gobuffalo/pop/logging"
"github.com/openebl/openebl/pkg/bu_server/api"
"github.com/openebl/openebl/pkg/bu_server/broker"
"github.com/openebl/openebl/pkg/bu_server/business_unit"
"github.com/openebl/openebl/pkg/bu_server/cert"
"github.com/openebl/openebl/pkg/bu_server/manager"
"github.com/openebl/openebl/pkg/bu_server/storage/postgres"
"github.com/openebl/openebl/pkg/bu_server/webhook"
"github.com/openebl/openebl/pkg/config"
"github.com/openebl/openebl/pkg/util"
"github.com/sirupsen/logrus"
)
const appName string = "bu-server"
type CLI struct {
Server struct {
} `cmd:"" help:"Run the server"`
Migrate struct {
Path string `short:"p" long:"path" help:"Path to the migration files" type:"existingdir" default:"migrations"`
} `cmd:"" help:"Migrate the database"`
Broker struct {
} `cmd:"" help:"Run the broker to send/receive messages with the relay server"`
Config string `short:"c" long:"config" help:"Path to the configuration file" type:"existingfile" default:"config.yaml"`
}
type Config struct {
Database util.PostgresDatabaseConfig `yaml:"database"`
Server struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
} `yaml:"server"`
Manager struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
} `yaml:"manager"`
Broker struct {
RelayServer string `yaml:"relay_server"`
CheckInterval int `yaml:"check_interval"`
BatchSize int `yaml:"batch_size"`
CertServer string `yaml:"cert_server"`
} `yaml:"broker"`
Webhook struct {
CheckInterval int `yaml:"check_interval"`
BatchSize int `yaml:"batch_size"`
Timeout int `yaml:"timeout"`
MaxRetry int `yaml:"max_retry"`
} `yaml:"webhook"`
OTLPEndpoint string `yaml:"otlp_endpoint"`
}
type App struct{}
func (a *App) Run() {
formatter.InitLogger()
var cli CLI
ctx := kong.Parse(&cli, kong.UsageOnError())
switch ctx.Command() {
case "server":
a.runServer(cli)
case "migrate":
a.runMigrate(cli)
case "broker":
a.runBroker(cli)
default:
}
}
func (a *App) runServer(cli CLI) {
ctx := context.Background()
var appConfig Config
if err := config.FromFile(cli.Config, &appConfig); err != nil {
logrus.Errorf("failed to load config: %v", err)
os.Exit(128)
}
if endpoint := appConfig.OTLPEndpoint; endpoint != "" {
exporter, err := otlp_util.InitExporter(
otlp_util.WithContext(ctx),
otlp_util.WithEndPoint(endpoint),
otlp_util.WithServiceName(appName),
otlp_util.WithInSecure(),
otlp_util.WithErrorHandler(func(err error) {
logrus.Warnf("OTLP error: %v", err)
}),
)
if err != nil {
logrus.Errorf("failed to initialize OTLP exporter: %v", err)
os.Exit(128)
}
defer func() { _ = exporter.Shutdown(ctx) }()
}
apiConfig := api.APIConfig{
Database: appConfig.Database,
LocalAddress: net.JoinHostPort(appConfig.Server.Host, strconv.Itoa(appConfig.Server.Port)),
}
apiServer, err := api.NewAPIWithConfig(apiConfig)
if err != nil {
logrus.Errorf("failed to create API server: %v", err)
os.Exit(128)
}
managerAPIConfig := manager.ManagerAPIConfig{
Database: appConfig.Database,
LocalAddress: net.JoinHostPort(appConfig.Manager.Host, strconv.Itoa(appConfig.Manager.Port)),
}
managerServer, err := manager.NewManagerAPI(managerAPIConfig)
if err != nil {
logrus.Errorf("failed to create Manager server: %v", err)
os.Exit(128)
}
ctx, stop := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
defer stop()
processorConfig := webhook.Config{
Database: appConfig.Database,
CheckInterval: appConfig.Webhook.CheckInterval,
BatchSize: appConfig.Webhook.BatchSize,
Timeout: appConfig.Webhook.Timeout,
MaxRetry: appConfig.Webhook.MaxRetry,
}
processor, err := webhook.NewProcessorWithConfig(processorConfig)
if err != nil {
logrus.Errorf("failed to create webhook processor: %v", err)
os.Exit(128)
}
wg := &sync.WaitGroup{}
go func(wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
if err := apiServer.Run(); err != nil {
logrus.Errorf("failed to run API server: %v", err)
os.Exit(1)
}
}(wg)
go func(wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
if err := managerServer.Run(); err != nil {
logrus.Errorf("failed to run Manager server: %v", err)
os.Exit(1)
}
}(wg)
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
processor.Run(ctx)
}(wg)
// listen for the stop signal
<-ctx.Done()
// Restore default behavior on the signals we are listening to
stop()
logrus.Info("shutting down gracefully, press Ctrl+C again to force")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := apiServer.Close(ctx); err != nil {
logrus.Warnf("failed to close API server: %v", err)
os.Exit(1)
}
if err := managerServer.Close(ctx); err != nil {
logrus.Warnf("failed to close Manager server: %v", err)
os.Exit(1)
}
wg.Wait()
}
func (a *App) runMigrate(cli CLI) {
var appConfig Config
if err := config.FromFile(cli.Config, &appConfig); err != nil {
logrus.Errorf("failed to load config: %v", err)
os.Exit(128)
}
// set up the logger
pop.SetLogger(func(lvl logging.Level, s string, args ...interface{}) {
switch lvl {
case logging.Debug:
logrus.Debugf(s, args...)
case logging.Info:
logrus.Infof(s, args...)
case logging.Warn:
logrus.Warnf(s, args...)
case logging.Error:
logrus.Errorf(s, args...)
case logging.SQL:
// Do nothing
}
})
// setup database connection
cd := pop.ConnectionDetails{
Dialect: "postgres",
Database: appConfig.Database.Database,
Host: appConfig.Database.Host,
Port: strconv.Itoa(appConfig.Database.Port),
User: appConfig.Database.User,
Password: appConfig.Database.Password,
}
conn, err := pop.NewConnection(&cd)
if err != nil {
logrus.Errorf("failed to create connection: %v", err)
os.Exit(128)
}
// create the database if it doesn't exist
if err = conn.Dialect.CreateDB(); err != nil {
logrus.Warnf("failed to create database: %v", err)
}
migrator, err := pop.NewFileMigrator(cli.Migrate.Path, conn)
if err != nil {
logrus.Errorf("failed to create migrator: %v", err)
os.Exit(128)
}
// Remove SchemaPath to prevent migrator try to dump schema.
migrator.SchemaPath = ""
// run the migrations
if err = migrator.Up(); err != nil {
logrus.Errorf("failed to migrate: %v", err)
os.Exit(1)
}
}
func (a *App) runBroker(cli CLI) {
var appConfig Config
if err := config.FromFile(cli.Config, &appConfig); err != nil {
logrus.Errorf("failed to load config: %v", err)
os.Exit(128)
}
dbStorage, err := postgres.NewStorageWithConfig(appConfig.Database)
if err != nil {
logrus.Errorf("failed to create database connection: %v", err)
os.Exit(128)
}
certMgr := cert.NewCertManager(cert.WithCertStore(dbStorage), cert.WithCertServerURL(appConfig.Broker.CertServer))
webhookCtrl := webhook.NewWebhookController(dbStorage)
buMgr := business_unit.NewBusinessUnitManager(dbStorage, certMgr, webhookCtrl, nil)
brokerService, err := broker.NewBroker(
broker.WithClientID("default"),
broker.WithCheckInterval(appConfig.Broker.CheckInterval),
broker.WithBatchSize(appConfig.Broker.BatchSize),
broker.WithRelayServer(appConfig.Broker.RelayServer),
broker.WithInboxStore(dbStorage),
broker.WithOutboxStore(dbStorage),
broker.WithCertManager(certMgr),
broker.WithBUManager(buMgr),
)
if err != nil {
logrus.Errorf("failed to create broker: %v", err)
os.Exit(128)
}
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
defer stop()
wg := &sync.WaitGroup{}
go func(wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
if err := brokerService.Run(ctx); err != nil {
logrus.Errorf("failed to run broker: %v", err)
os.Exit(1)
}
}(wg)
// listen for the stop signal
<-ctx.Done()
// Restore default behavior on the signals we are listening to
stop()
logrus.Info("shutting down gracefully, press Ctrl+C again to force")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := brokerService.Close(ctx); err != nil {
logrus.Warnf("failed to close broker: %v", err)
os.Exit(1)
}
wg.Wait()
logrus.Info("broker stopped")
}