diff --git a/pkg/daemon/server/service/rater/pod_tracker.go b/pkg/daemon/server/service/rater/pod_tracker.go index 6ea9aee16..769d99fef 100644 --- a/pkg/daemon/server/service/rater/pod_tracker.go +++ b/pkg/daemon/server/service/rater/pod_tracker.go @@ -124,12 +124,15 @@ func (pt *PodTracker) isActive(vertexName, podName string) bool { // using the vertex headless service to check if a pod exists or not. // example for 0th pod : https://simple-pipeline-in-0.simple-pipeline-in-headless.default.svc:2469/metrics url := fmt.Sprintf("https://%s.%s.%s.svc:%v/metrics", podName, pt.pipeline.Name+"-"+vertexName+"-headless", pt.pipeline.Namespace, v1alpha1.VertexMetricsPort) - if _, err := pt.httpClient.Head(url); err != nil { + resp, err := pt.httpClient.Head(url) + if err != nil { // during performance test (100 pods per vertex), we never saw a false negative, meaning every time isActive returns false, // it truly means the pod doesn't exist. // in reality, we can imagine that a pod can be active but the Head request times out for some reason and returns an incorrect false, // if we ever observe such case, we can think about adding retry here. + pt.log.Debugf("Failed to check if pod %s is active: %v", podName, err) return false } + _ = resp.Body.Close() return true } diff --git a/pkg/daemon/server/service/rater/rater.go b/pkg/daemon/server/service/rater/rater.go index 4e87582b7..54da577d3 100644 --- a/pkg/daemon/server/service/rater/rater.go +++ b/pkg/daemon/server/service/rater/rater.go @@ -218,40 +218,42 @@ func sleep(ctx context.Context, duration time.Duration) { func (r *Rater) getPodReadCounts(vertexName, vertexType, podName string) *PodReadCount { // scrape the read total metric from pod metric port url := fmt.Sprintf("https://%s.%s.%s.svc:%v/metrics", podName, r.pipeline.Name+"-"+vertexName+"-headless", r.pipeline.Namespace, v1alpha1.VertexMetricsPort) - if res, err := r.httpClient.Get(url); err != nil { + resp, err := r.httpClient.Get(url) + if err != nil { r.log.Errorf("failed reading the metrics endpoint, %v", err.Error()) return nil + } + defer resp.Body.Close() + + textParser := expfmt.TextParser{} + result, err := textParser.TextToMetricFamilies(resp.Body) + if err != nil { + r.log.Errorf("failed parsing to prometheus metric families, %v", err.Error()) + return nil + } + var readTotalMetricName string + if vertexType == "reduce" { + readTotalMetricName = "reduce_isb_reader_read_total" } else { - textParser := expfmt.TextParser{} - result, err := textParser.TextToMetricFamilies(res.Body) - if err != nil { - r.log.Errorf("failed parsing to prometheus metric families, %v", err.Error()) - return nil - } - var readTotalMetricName string - if vertexType == "reduce" { - readTotalMetricName = "reduce_isb_reader_read_total" - } else { - readTotalMetricName = "forwarder_read_total" - } - if value, ok := result[readTotalMetricName]; ok && value != nil && len(value.GetMetric()) > 0 { - metricsList := value.GetMetric() - partitionReadCount := make(map[string]float64) - for _, ele := range metricsList { - partitionName := "" - for _, label := range ele.Label { - if label.GetName() == "partition_name" { - partitionName = label.GetValue() - } + readTotalMetricName = "forwarder_read_total" + } + if value, ok := result[readTotalMetricName]; ok && value != nil && len(value.GetMetric()) > 0 { + metricsList := value.GetMetric() + partitionReadCount := make(map[string]float64) + for _, ele := range metricsList { + partitionName := "" + for _, label := range ele.Label { + if label.GetName() == "partition_name" { + partitionName = label.GetValue() } - partitionReadCount[partitionName] = ele.Counter.GetValue() } - podReadCount := &PodReadCount{podName, partitionReadCount} - return podReadCount - } else { - r.log.Errorf("failed getting the read total metric, the metric is not available.") - return nil + partitionReadCount[partitionName] = ele.Counter.GetValue() } + podReadCount := &PodReadCount{podName, partitionReadCount} + return podReadCount + } else { + r.log.Errorf("failed getting the read total metric, the metric is not available.") + return nil } }