-
Notifications
You must be signed in to change notification settings - Fork 53
/
http.go
85 lines (69 loc) · 2.21 KB
/
http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package agent
import (
"context"
"sync"
"sync/atomic"
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/util/push"
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/rancher/opni/pkg/clients"
"github.com/rancher/opni/pkg/health"
"github.com/rancher/opni/pkg/plugins/apis/apiextensions"
"github.com/rancher/opni/plugins/metrics/pkg/apis/remotewrite"
)
type HttpServer struct {
apiextensions.UnsafeHTTPAPIExtensionServer
logger *zap.SugaredLogger
remoteWriteClientMu sync.RWMutex
remoteWriteClient clients.Locker[remotewrite.RemoteWriteClient]
conditions health.ConditionTracker
enabled atomic.Bool
}
func NewHttpServer(ct health.ConditionTracker, lg *zap.SugaredLogger) *HttpServer {
return &HttpServer{
logger: lg,
conditions: ct,
}
}
func (s *HttpServer) SetEnabled(enabled bool) {
if enabled {
s.conditions.Set(CondRemoteWrite, health.StatusPending, "")
} else {
s.conditions.Clear(CondRemoteWrite)
}
s.enabled.Store(enabled)
}
func (s *HttpServer) SetRemoteWriteClient(client clients.Locker[remotewrite.RemoteWriteClient]) {
s.remoteWriteClientMu.Lock()
defer s.remoteWriteClientMu.Unlock()
s.remoteWriteClient = client
}
func (s *HttpServer) ConfigureRoutes(router *gin.Engine) {
router.POST("/api/agent/push", gin.WrapH(push.Handler(100<<20, nil, s.pushFunc)))
pprof.Register(router, "/debug/plugin_metrics/pprof")
}
func (s *HttpServer) pushFunc(ctx context.Context, writeReq *cortexpb.WriteRequest) (writeResp *cortexpb.WriteResponse, writeErr error) {
if !s.enabled.Load() {
return nil, status.Errorf(codes.Unavailable, "api not enabled")
}
s.remoteWriteClientMu.RLock()
defer s.remoteWriteClientMu.RUnlock()
if s.remoteWriteClient == nil {
return nil, status.Errorf(codes.Unavailable, "gateway not connected")
}
ok := s.remoteWriteClient.Use(func(rwc remotewrite.RemoteWriteClient) {
if rwc == nil {
s.conditions.Set(CondRemoteWrite, health.StatusPending, "gateway not connected")
return
}
writeResp, writeErr = rwc.Push(ctx, writeReq)
})
if !ok {
return nil, status.Errorf(codes.Unavailable, "gateway not connected")
}
return
}