Skip to content

Commit

Permalink
Add timeout to endpointset metric collector
Browse files Browse the repository at this point in the history
We have seen deadlocks with endpoint discovery caused by the metric
collector hanging and not releasing the store labels lock. This causes
the endpoint update to hang, which also makes all endpoint readers hang on
acquiring a read lock for the resolved endpoints slice.

This commit makes sure the Collect method on the metrics collector has
a built in timeout to guard against cases where an upstream call never
reads from the collection channel.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed May 6, 2024
1 parent 07838b8 commit cad8f93
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,17 @@ type endpointSetNodeCollector struct {
storeNodes map[component.Component]map[string]int
storePerExtLset map[string]int

logger log.Logger
connectionsDesc *prometheus.Desc
labels []string
}

func newEndpointSetNodeCollector(labels ...string) *endpointSetNodeCollector {
func newEndpointSetNodeCollector(logger log.Logger, labels ...string) *endpointSetNodeCollector {
if len(labels) == 0 {
labels = []string{string(ExternalLabels), string(StoreType)}
}
return &endpointSetNodeCollector{
logger: logger,
storeNodes: map[component.Component]map[string]int{},
connectionsDesc: prometheus.NewDesc(
"thanos_store_nodes_grpc_connections",
Expand Down Expand Up @@ -277,7 +279,12 @@ func (c *endpointSetNodeCollector) Collect(ch chan<- prometheus.Metric) {
lbls = append(lbls, storeTypeStr)
}
}
ch <- prometheus.MustNewConstMetric(c.connectionsDesc, prometheus.GaugeValue, float64(occurrences), lbls...)
select {
case ch <- prometheus.MustNewConstMetric(c.connectionsDesc, prometheus.GaugeValue, float64(occurrences), lbls...):
case <-time.After(1 * time.Second):
level.Warn(c.logger).Log("msg", "failed to collect endpointset metrics", "timeout", 1*time.Second)
return
}
}
}
}
Expand Down Expand Up @@ -319,7 +326,7 @@ func NewEndpointSet(
endpointInfoTimeout time.Duration,
endpointMetricLabels ...string,
) *EndpointSet {
endpointsMetric := newEndpointSetNodeCollector(endpointMetricLabels...)
endpointsMetric := newEndpointSetNodeCollector(logger, endpointMetricLabels...)
if reg != nil {
reg.MustRegister(endpointsMetric)
}
Expand Down

0 comments on commit cad8f93

Please sign in to comment.