/
main.go
118 lines (99 loc) · 3.47 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package main
import (
"context"
"fmt"
"log"
"os"
mflog "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/uuid"
agent "github.com/ultravioletrs/agent/agent"
"github.com/ultravioletrs/agent/agent/api"
agentgrpc "github.com/ultravioletrs/agent/agent/api/grpc"
httpapi "github.com/ultravioletrs/agent/agent/api/http"
"github.com/ultravioletrs/agent/agent/tracing"
"github.com/ultravioletrs/agent/internal"
"github.com/ultravioletrs/agent/internal/env"
jaegerclient "github.com/ultravioletrs/agent/internal/jaeger"
"github.com/ultravioletrs/agent/internal/server"
grpcserver "github.com/ultravioletrs/agent/internal/server/grpc"
httpserver "github.com/ultravioletrs/agent/internal/server/http"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
const (
svcName = "agent"
envPrefixHTTP = "AGENT_HTTP_"
envPrefixGRPC = "AGENT_GRPC_"
defSvcHTTPPort = "9031"
defSvcGRPCPort = "7002"
)
type config struct {
LogLevel string `env:"AGENT_LOG_LEVEL" envDefault:"info"`
JaegerURL string `env:"AGENT_JAEGER_URL" envDefault:"http://localhost:14268/api/traces"`
InstanceID string `env:"AGENT_INSTANCE_ID" envDefault:""`
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
var cfg config
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
}
logger, err := mflog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf(err.Error())
}
if cfg.InstanceID == "" {
cfg.InstanceID, err = uuid.New().ID()
if err != nil {
log.Fatalf("Failed to generate instanceID: %s", err)
}
}
tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID)
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err))
}
}()
tracer := tp.Tracer(svcName)
svc := newService(logger, tracer)
var httpServerConfig = server.Config{Port: defSvcHTTPPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s gRPC server configuration : %s", svcName, err))
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(svc, cfg.InstanceID), logger)
var grpcServerConfig = server.Config{Port: defSvcGRPCPort}
if err := env.Parse(&grpcServerConfig, env.Options{Prefix: envPrefixGRPC}); err != nil {
log.Fatalf("failed to load %s gRPC server configuration : %s", svcName, err.Error())
}
registerAgentServiceServer := func(srv *grpc.Server) {
reflection.Register(srv)
agent.RegisterAgentServiceServer(srv, agentgrpc.NewServer(svc))
}
gs := grpcserver.New(ctx, cancel, svcName, grpcServerConfig, registerAgentServiceServer, logger)
g.Go(func() error {
return hs.Start()
})
g.Go(func() error {
return gs.Start()
})
g.Go(func() error {
return server.StopHandler(ctx, cancel, logger, svcName, hs, gs)
})
if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("%s service terminated: %s", svcName, err))
}
}
func newService(logger mflog.Logger, tracer trace.Tracer) agent.Service {
svc := agent.New()
svc = api.LoggingMiddleware(svc, logger)
counter, latency := internal.MakeMetrics(svcName, "api")
svc = api.MetricsMiddleware(svc, counter, latency)
svc = tracing.New(svc, tracer)
return svc
}