Skip to content

Commit

Permalink
fix: resource leak inside daemon server (#837)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
  • Loading branch information
yhl25 committed Jul 8, 2023
1 parent 1f19a74 commit b660b6d
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 29 deletions.
5 changes: 4 additions & 1 deletion pkg/daemon/server/service/rater/pod_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,15 @@ func (pt *PodTracker) isActive(vertexName, podName string) bool {
// using the vertex headless service to check if a pod exists or not.
// example for 0th pod : https://simple-pipeline-in-0.simple-pipeline-in-headless.default.svc:2469/metrics
url := fmt.Sprintf("https://%s.%s.%s.svc:%v/metrics", podName, pt.pipeline.Name+"-"+vertexName+"-headless", pt.pipeline.Namespace, v1alpha1.VertexMetricsPort)
if _, err := pt.httpClient.Head(url); err != nil {
resp, err := pt.httpClient.Head(url)
if err != nil {
// during performance test (100 pods per vertex), we never saw a false negative, meaning every time isActive returns false,
// it truly means the pod doesn't exist.
// in reality, we can imagine that a pod can be active but the Head request times out for some reason and returns an incorrect false,
// if we ever observe such case, we can think about adding retry here.
pt.log.Debugf("Failed to check if pod %s is active: %v", podName, err)
return false
}
_ = resp.Body.Close()
return true
}
58 changes: 30 additions & 28 deletions pkg/daemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,40 +218,42 @@ func sleep(ctx context.Context, duration time.Duration) {
func (r *Rater) getPodReadCounts(vertexName, vertexType, podName string) *PodReadCount {
// scrape the read total metric from pod metric port
url := fmt.Sprintf("https://%s.%s.%s.svc:%v/metrics", podName, r.pipeline.Name+"-"+vertexName+"-headless", r.pipeline.Namespace, v1alpha1.VertexMetricsPort)
if res, err := r.httpClient.Get(url); err != nil {
resp, err := r.httpClient.Get(url)
if err != nil {
r.log.Errorf("failed reading the metrics endpoint, %v", err.Error())
return nil
}
defer resp.Body.Close()

textParser := expfmt.TextParser{}
result, err := textParser.TextToMetricFamilies(resp.Body)
if err != nil {
r.log.Errorf("failed parsing to prometheus metric families, %v", err.Error())
return nil
}
var readTotalMetricName string
if vertexType == "reduce" {
readTotalMetricName = "reduce_isb_reader_read_total"
} else {
textParser := expfmt.TextParser{}
result, err := textParser.TextToMetricFamilies(res.Body)
if err != nil {
r.log.Errorf("failed parsing to prometheus metric families, %v", err.Error())
return nil
}
var readTotalMetricName string
if vertexType == "reduce" {
readTotalMetricName = "reduce_isb_reader_read_total"
} else {
readTotalMetricName = "forwarder_read_total"
}
if value, ok := result[readTotalMetricName]; ok && value != nil && len(value.GetMetric()) > 0 {
metricsList := value.GetMetric()
partitionReadCount := make(map[string]float64)
for _, ele := range metricsList {
partitionName := ""
for _, label := range ele.Label {
if label.GetName() == "partition_name" {
partitionName = label.GetValue()
}
readTotalMetricName = "forwarder_read_total"
}
if value, ok := result[readTotalMetricName]; ok && value != nil && len(value.GetMetric()) > 0 {
metricsList := value.GetMetric()
partitionReadCount := make(map[string]float64)
for _, ele := range metricsList {
partitionName := ""
for _, label := range ele.Label {
if label.GetName() == "partition_name" {
partitionName = label.GetValue()
}
partitionReadCount[partitionName] = ele.Counter.GetValue()
}
podReadCount := &PodReadCount{podName, partitionReadCount}
return podReadCount
} else {
r.log.Errorf("failed getting the read total metric, the metric is not available.")
return nil
partitionReadCount[partitionName] = ele.Counter.GetValue()
}
podReadCount := &PodReadCount{podName, partitionReadCount}
return podReadCount
} else {
r.log.Errorf("failed getting the read total metric, the metric is not available.")
return nil
}
}

Expand Down

0 comments on commit b660b6d

Please sign in to comment.