forked from argoproj/argo-workflows
/
server.go
152 lines (134 loc) · 4.35 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package metrics
import (
"context"
"crypto/tls"
"fmt"
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/utils/env"
tlsutils "github.com/argoproj/argo-workflows/v3/util/tls"
)
// RunServer starts a metrics server
// If 'isDummy' is set to true, the dummy metrics server will be started. If it's false, the prometheus metrics server will be started
func (m *Metrics) RunServer(ctx context.Context, isDummy bool) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)
if !m.metricsConfig.Enabled {
// If metrics aren't enabled, return
return
}
metricsRegistry := prometheus.NewRegistry()
metricsRegistry.MustRegister(m)
if m.metricsConfig.SameServerAs(m.telemetryConfig) {
// If the metrics and telemetry servers are the same, run both of them in the same instance
metricsRegistry.MustRegister(collectors.NewGoCollector())
} else if m.telemetryConfig.Enabled {
// If the telemetry server is different -- and it's enabled -- run each on its own instance
telemetryRegistry := prometheus.NewRegistry()
telemetryRegistry.MustRegister(collectors.NewGoCollector())
go runServer(m.telemetryConfig, telemetryRegistry, ctx, isDummy)
}
// Run the metrics server
go runServer(m.metricsConfig, metricsRegistry, ctx, isDummy)
go m.garbageCollector(ctx)
}
func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.Context, isDummy bool) {
var handlerOpts promhttp.HandlerOpts
if config.IgnoreErrors {
handlerOpts.ErrorHandling = promhttp.ContinueOnError
}
name := ""
mux := http.NewServeMux()
if isDummy {
// dummy metrics server responds to all requests with a 200 status, but without providing any metrics data
name = "dummy metrics server"
mux.HandleFunc(config.Path, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
} else {
name = "prometheus metrics server"
mux.Handle(config.Path, promhttp.HandlerFor(registry, handlerOpts))
}
srv := &http.Server{Addr: fmt.Sprintf(":%v", config.Port), Handler: mux}
if config.Secure {
tlsMinVersion, err := env.GetInt("TLS_MIN_VERSION", tls.VersionTLS12)
if err != nil {
panic(err)
}
log.Infof("Generating Self Signed TLS Certificates for Telemetry Servers")
tlsConfig, err := tlsutils.GenerateX509KeyPairTLSConfig(uint16(tlsMinVersion))
if err != nil {
panic(err)
}
srv.TLSConfig = tlsConfig
go func() {
log.Infof("Starting %s at localhost:%v%s", name, config.Port, config.Path)
if err := srv.ListenAndServeTLS("", ""); err != http.ErrServerClosed {
panic(err)
}
}()
} else {
go func() {
log.Infof("Starting %s at localhost:%v%s", name, config.Port, config.Path)
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
panic(err)
}
}()
}
// Waiting for stop signal
<-ctx.Done()
// Shutdown the server gracefully with a 1 second timeout
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Infof("Unable to shutdown %s at localhost:%v%s", name, config.Port, config.Path)
} else {
log.Infof("Successfully shutdown %s at localhost:%v%s", name, config.Port, config.Path)
}
}
func (m *Metrics) Describe(ch chan<- *prometheus.Desc) {
for _, metric := range m.allMetrics() {
ch <- metric.Desc()
}
m.logMetric.Describe(ch)
K8sRequestTotalMetric.Describe(ch)
PodMissingMetric.Describe(ch)
WorkflowConditionMetric.Describe(ch)
}
func (m *Metrics) Collect(ch chan<- prometheus.Metric) {
for _, metric := range m.allMetrics() {
ch <- metric
}
m.logMetric.Collect(ch)
K8sRequestTotalMetric.Collect(ch)
PodMissingMetric.Collect(ch)
WorkflowConditionMetric.Collect(ch)
}
func (m *Metrics) garbageCollector(ctx context.Context) {
if m.metricsConfig.TTL == 0 {
return
}
ticker := time.NewTicker(m.metricsConfig.TTL)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for key, metric := range m.customMetrics {
if time.Since(metric.lastUpdated) > m.metricsConfig.TTL {
switch {
case metric.realtime && metric.completed:
delete(m.customMetrics, key)
case !metric.realtime:
delete(m.customMetrics, key)
}
}
}
}
}
}