-
Notifications
You must be signed in to change notification settings - Fork 34
/
service.go
129 lines (105 loc) · 3.45 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Package abn contains the implementation of the A/B/n service (via gRPC)
package abn
// service.go - entry point for A/B/n service
import (
"context"
"fmt"
"net"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
pb "github.com/iter8-tools/iter8/abn/grpc"
util "github.com/iter8-tools/iter8/base"
"github.com/iter8-tools/iter8/base/log"
storageclient "github.com/iter8-tools/iter8/storage/client"
// auth package is necessary to enable authentication with various cloud providers
_ "k8s.io/client-go/plugin/pkg/client/auth"
)
const (
configEnv = "ABN_CONFIG_FILE"
defaultPortNumber = 50051
)
// newServer returns a new gRPC server
func newServer() *abnServer {
return &abnServer{}
}
type abnServer struct {
pb.UnimplementedABNServer
}
// Lookup identifies a versionNumber (index to list of versions) that should be used for a given user
// This method is exposed to gRPC clients
func (server *abnServer) Lookup(ctx context.Context, appMsg *pb.Application) (*pb.VersionRecommendation, error) {
log.Logger.Tracef("Lookup called for application=%s, user=%s", appMsg.GetName(), appMsg.GetUser())
defer log.Logger.Trace("Lookup completed")
_, versionNumber, err := lookupInternal(
appMsg.GetName(),
appMsg.GetUser(),
)
if err != nil {
log.Logger.Warnf("Lookup(%s,%s) failed: %s", appMsg.GetName(), appMsg.GetUser(), err.Error())
return nil, err
}
if versionNumber < 0 {
log.Logger.Warnf("Lookup(%s,%s) returned nil", appMsg.GetName(), appMsg.GetUser())
return nil, err
}
log.Logger.Tracef("Lookup(%s,%s) -> %d", appMsg.GetName(), appMsg.GetUser(), versionNumber)
return &pb.VersionRecommendation{
VersionNumber: int32(versionNumber),
}, err
}
// WriteMetric identifies the version with which a metric is associated (from user) and
// writes the metric value
func (server *abnServer) WriteMetric(ctx context.Context, metricMsg *pb.MetricValue) (*emptypb.Empty, error) {
log.Logger.Trace("WriteMetric called")
defer log.Logger.Trace("WriteMetric completed")
return &emptypb.Empty{},
writeMetricInternal(
metricMsg.GetApplication(),
metricMsg.GetUser(),
metricMsg.GetName(),
metricMsg.GetValue(),
)
}
// abnConfig defines the configuration of the controllers
type abnConfig struct {
// Port is port number on which the abn gRPC service should listen
Port *int `json:"port,omitempty"`
}
// LaunchGRPCServer starts gRPC server
func LaunchGRPCServer(opts []grpc.ServerOption, stopCh <-chan struct{}) error {
// read configutation for metrics service
conf := &abnConfig{}
err := util.ReadConfig(configEnv, conf, func() {
if conf.Port == nil {
conf.Port = util.IntPointer(defaultPortNumber)
}
})
if err != nil {
log.Logger.Errorf("unable to read metrics configuration: %s", err.Error())
return err
}
lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *conf.Port))
if err != nil {
log.Logger.WithError(err).Error("service failed to listen")
return err
}
grpcServer := grpc.NewServer(opts...)
pb.RegisterABNServer(grpcServer, newServer())
// configure MetricsClient if needed
storageclient.MetricsClient, err = storageclient.GetClient()
if err != nil {
log.Logger.Error("Unable to configure metrics storage client ", err)
return err
}
go func() {
<-stopCh
log.Logger.Warnf("stop channel closed, shutting down")
grpcServer.GracefulStop()
}()
err = grpcServer.Serve(lis)
if err != nil {
log.Logger.WithError(err).Error("failed to start service")
return err
}
return nil
}