Skip to content

Commit

Permalink
refactor: re-arrange some of the rater implementations (#1036)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <yangkr920208@gmail.com>
  • Loading branch information
KeranYang authored and whynowy committed Sep 14, 2023
1 parent b511eff commit 8cbb4d6
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 83 deletions.
21 changes: 17 additions & 4 deletions pkg/daemon/server/service/rater/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,20 @@ import (
sharedqueue "github.com/numaproj/numaflow/pkg/shared/queue"
)

const IndexNotFound = -1
const (
indexNotFound = -1

// The following string set is used to identify the vertex type of pod

// keyVertexTypeReduce is the vertex type string for reduce vertex
keyVertexTypeReduce = "reduce"
// keyVertexTypeSource is the vertex type string for a source vertex
keyVertexTypeSource = "source"
// keyVertexTypeSink is the vertex type string for a sink vertex
keyVertexTypeSink = "sink"
// keyVertexTypeOther is the vertex type string for other vertices
keyVertexTypeOther = "other"
)

// UpdateCount updates the count of processed messages for a pod at a given time
func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podReadCounts *PodReadCount) {
Expand Down Expand Up @@ -52,7 +65,7 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec
// we consider the last but one element as the end index because the last element might be incomplete
// we can be sure that the last but one element in the queue is complete.
endIndex := len(counts) - 2
if startIndex == IndexNotFound {
if startIndex == indexNotFound {
return 0
}

Expand Down Expand Up @@ -98,8 +111,8 @@ func findStartIndex(lookbackSeconds int64, counts []*TimestampedCounts) int {
n := len(counts)
now := time.Now().Truncate(CountWindow).Unix()
if n < 2 || now-counts[n-2].timestamp > lookbackSeconds {
// if the second last element is already outside the lookback window, we return IndexNotFound
return IndexNotFound
// if the second last element is already outside the lookback window, we return indexNotFound
return indexNotFound
}

startIndex := n - 2
Expand Down
76 changes: 61 additions & 15 deletions pkg/daemon/server/service/rater/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ func TestCalculateRate(t *testing.T) {
tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 20.0}})
q.Append(tc3)

// no enough data collected within lookback seconds, expect 0 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1"))
// no enough data collected within lookback seconds, expect 0 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
// tc1 and tc2 are used to calculate the rate
assert.Equal(t, 0.5, CalculateRate(q, 25, "partition1"))
Expand All @@ -173,9 +173,9 @@ func TestCalculateRate(t *testing.T) {
tc4.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 80.0}})
q.Append(tc4)

// no enough data collected within lookback seconds, expect 0 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1"))
// no enough data collected within lookback seconds, expect 0 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
// tc2 and tc3 are used to calculate the rate
assert.Equal(t, 5.0, CalculateRate(q, 25, "partition1"))
Expand All @@ -202,11 +202,11 @@ func TestCalculateRate(t *testing.T) {
tc3.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 300.0}})
q.Append(tc3)

// no enough data collected within lookback seconds, expect 0 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1"))
// no enough data collected within lookback seconds, expect 0 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
// no enough data collected within lookback seconds, expect 0 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 25, "partition1"))
// tc1 and tc2 are used to calculate the rate
assert.Equal(t, 15.0, CalculateRate(q, 35, "partition1"))
Expand All @@ -229,11 +229,11 @@ func TestCalculateRate(t *testing.T) {
tc3.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 100.0}})
q.Append(tc3)

// no enough data collected within lookback seconds, expect 0 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1"))
// no enough data collected within lookback seconds, expect 0 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
// no enough data collected within lookback seconds, expect 0 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 25, "partition1"))
// tc1 and tc2 are used to calculate the rate
assert.Equal(t, 30.0, CalculateRate(q, 35, "partition1"))
Expand All @@ -256,11 +256,11 @@ func TestCalculateRate(t *testing.T) {
tc3.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 100.0}})
q.Append(tc3)

// no enough data collected within lookback seconds, expect 0 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1"))
// no enough data collected within lookback seconds, expect 0 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
// no enough data collected within lookback seconds, expect 0 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 25, "partition1"))
// tc1 and tc2 are used to calculate the rate
assert.Equal(t, 25.0, CalculateRate(q, 35, "partition1"))
Expand Down Expand Up @@ -291,9 +291,9 @@ func TestCalculateRate(t *testing.T) {
q.Append(tc4)

// partition1 rate
// no enough data collected within lookback seconds, expect 0 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1"))
// no enough data collected within lookback seconds, expect 0 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
// tc2 and tc3 are used to calculate the rate
assert.Equal(t, 5.0, CalculateRate(q, 25, "partition1"))
Expand Down Expand Up @@ -330,4 +330,50 @@ func TestCalculateRate(t *testing.T) {
assert.Equal(t, 0.0, CalculateRate(q, 35, "partition100"))
assert.Equal(t, 0.0, CalculateRate(q, 100, "partition100"))
})

t.Run("multiplePods_givenOnePodHandleMultiplePartitions_whenCalculateRate_thenReturnRate", func(t *testing.T) {
q := sharedqueue.New[*TimestampedCounts](1800)
now := time.Now()

// this test uses an extreme case where pod1 handle3 10 messages at a time for each partition, and pod 2 100, pod 3 1000
tc1 := NewTimestampedCounts(now.Truncate(time.Second*10).Unix() - 30)
tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0, "partition2": 20.0}})
tc1.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 10.0, "partition2": 20.0}})
tc1.Update(&PodReadCount{"pod3", map[string]float64{"partition1": 10.0, "partition2": 20.0}})
q.Append(tc1)
tc2 := NewTimestampedCounts(now.Truncate(time.Second*10).Unix() - 20)
tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 20.0, "partition2": 30.0}})
tc2.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 110.0, "partition2": 120.0}})
tc2.Update(&PodReadCount{"pod3", map[string]float64{"partition1": 1010.0, "partition2": 1020.0}})
q.Append(tc2)
tc3 := NewTimestampedCounts(now.Truncate(time.Second*10).Unix() - 10)
tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 30.0, "partition2": 40.0}})
tc3.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 210.0, "partition2": 220.0}})
tc3.Update(&PodReadCount{"pod3", map[string]float64{"partition1": 2010.0, "partition2": 2020.0}})
q.Append(tc3)
tc4 := NewTimestampedCounts(now.Truncate(time.Second * 10).Unix())
tc4.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 40.0, "partition2": 50.0}})
tc4.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 310.0, "partition2": 320.0}})
tc4.Update(&PodReadCount{"pod3", map[string]float64{"partition1": 3010.0, "partition2": 3020.0}})
q.Append(tc4)

// partition1 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1"))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
// tc2 and tc3 are used to calculate the rate
assert.Equal(t, 111.0, CalculateRate(q, 25, "partition1"))
// tc1, 2 and 3 are used to calculate the rate
assert.Equal(t, 111.0, CalculateRate(q, 35, "partition1"))
// tc1, 2 and 3 are used to calculate the rate
assert.Equal(t, 111.0, CalculateRate(q, 100, "partition1"))

// partition2 rate
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition2"))
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition2"))
assert.Equal(t, 111.0, CalculateRate(q, 25, "partition2"))
assert.Equal(t, 111.0, CalculateRate(q, 35, "partition2"))
assert.Equal(t, 111.0, CalculateRate(q, 100, "partition2"))
})
}
83 changes: 46 additions & 37 deletions pkg/daemon/server/service/rater/pod_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type PodTracker struct {
}

func NewPodTracker(ctx context.Context, p *v1alpha1.Pipeline, opts ...PodTrackerOption) *PodTracker {
pt := PodTracker{
pt := &PodTracker{
pipeline: p,
log: logging.FromContext(ctx).Named("PodTracker"),
httpClient: &http.Client{
Expand All @@ -61,10 +61,10 @@ func NewPodTracker(ctx context.Context, p *v1alpha1.Pipeline, opts ...PodTracker

for _, opt := range opts {
if opt != nil {
opt(&pt)
opt(pt)
}
}
return &pt
return pt
}

type PodTrackerOption func(*PodTracker)
Expand All @@ -78,42 +78,51 @@ func WithRefreshInterval(d time.Duration) PodTrackerOption {

func (pt *PodTracker) Start(ctx context.Context) error {
pt.log.Debugf("Starting tracking active pods for pipeline %s...", pt.pipeline.Name)
go func() {
ticker := time.NewTicker(pt.refreshInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
pt.log.Infof("Context is cancelled. Stopping tracking active pods for pipeline %s...", pt.pipeline.Name)
return
case <-ticker.C:
for _, v := range pt.pipeline.Spec.Vertices {
var vType string
if v.IsReduceUDF() {
vType = "reduce"
} else if v.IsASource() {
vType = "source"
} else if v.IsASink() {
vType = "sink"
} else {
vType = "other"
}
for i := 0; i < int(v.Scale.GetMaxReplicas()); i++ {
podName := fmt.Sprintf("%s-%s-%d", pt.pipeline.Name, v.Name, i)
podKey := pt.getPodKey(i, v.Name, vType)
if pt.isActive(v.Name, podName) {
pt.activePods.PushBack(podKey)
} else {
// if the pod is not active, remove it from the active pod list
pt.activePods.Remove(podKey)
}
}
}
pt.log.Debugf("Finished updating the active pod set: %v", pt.activePods.ToString())
go pt.trackActivePods(ctx)
return nil
}

func (pt *PodTracker) trackActivePods(ctx context.Context) {
ticker := time.NewTicker(pt.refreshInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
pt.log.Infof("Context is cancelled. Stopping tracking active pods for pipeline %s...", pt.pipeline.Name)
return
case <-ticker.C:
pt.updateActivePods()
}
}
}

func (pt *PodTracker) updateActivePods() {
for _, v := range pt.pipeline.Spec.Vertices {
vType := getVertexType(v)
for i := 0; i < int(v.Scale.GetMaxReplicas()); i++ {
podName := fmt.Sprintf("%s-%s-%d", pt.pipeline.Name, v.Name, i)
podKey := pt.getPodKey(i, v.Name, vType)
if pt.isActive(v.Name, podName) {
pt.activePods.PushBack(podKey)
} else {
pt.activePods.Remove(podKey)
}
}
}()
return nil
}
pt.log.Debugf("Finished updating the active pod set: %v", pt.activePods.ToString())
}

func getVertexType(v v1alpha1.AbstractVertex) string {
switch {
case v.IsReduceUDF():
return keyVertexTypeReduce
case v.IsASource():
return keyVertexTypeSource
case v.IsASink():
return keyVertexTypeSink
default:
return keyVertexTypeOther
}
}

// LeastRecentlyUsed returns the least recently used pod from the active pod list.
Expand Down
52 changes: 27 additions & 25 deletions pkg/daemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ type Rater struct {
podTracker *PodTracker
// timestampedPodCounts is a map between vertex name and a queue of timestamped counts for that vertex
timestampedPodCounts map[string]*sharedqueue.OverflowQueue[*TimestampedCounts]
options *options
// userSpecifiedLookBackSeconds is a map between vertex name and the user-specified lookback seconds for that vertex
userSpecifiedLookBackSeconds map[string]int64
options *options
}

// PodReadCount is a struct to maintain count of messages read from each partition by a pod
Expand Down Expand Up @@ -89,15 +91,17 @@ func NewRater(ctx context.Context, p *v1alpha1.Pipeline, opts ...Option) *Rater
},
Timeout: time.Second * 1,
},
log: logging.FromContext(ctx).Named("Rater"),
timestampedPodCounts: make(map[string]*sharedqueue.OverflowQueue[*TimestampedCounts]),
options: defaultOptions(),
log: logging.FromContext(ctx).Named("Rater"),
timestampedPodCounts: make(map[string]*sharedqueue.OverflowQueue[*TimestampedCounts]),
userSpecifiedLookBackSeconds: make(map[string]int64),
options: defaultOptions(),
}

rater.podTracker = NewPodTracker(ctx, p)
for _, v := range p.Spec.Vertices {
// maintain the total counts of the last 30 minutes(1800 seconds) since we support 1m, 5m, 15m lookback seconds.
rater.timestampedPodCounts[v.Name] = sharedqueue.New[*TimestampedCounts](int(1800 / CountWindow.Seconds()))
rater.userSpecifiedLookBackSeconds[v.Name] = int64(v.Scale.GetLookbackSeconds())
}

for _, opt := range opts {
Expand Down Expand Up @@ -213,6 +217,17 @@ func sleep(ctx context.Context, duration time.Duration) {
// getPodReadCounts returns the total number of messages read by the pod
// since a pod can read from multiple partitions, we will return a map of partition to read count.
func (r *Rater) getPodReadCounts(vertexName, vertexType, podName string) *PodReadCount {
metricNames := map[string]string{
keyVertexTypeReduce: "reduce_isb_reader_read_total",
keyVertexTypeSource: "source_forwarder_read_total",
keyVertexTypeSink: "sink_forwarder_read_total",
}

readTotalMetricName, ok := metricNames[vertexType]
if !ok {
readTotalMetricName = "forwarder_read_total"
}

// scrape the read total metric from pod metric port
url := fmt.Sprintf("https://%s.%s.%s.svc:%v/metrics", podName, r.pipeline.Name+"-"+vertexName+"-headless", r.pipeline.Namespace, v1alpha1.VertexMetricsPort)
resp, err := r.httpClient.Get(url)
Expand All @@ -228,27 +243,22 @@ func (r *Rater) getPodReadCounts(vertexName, vertexType, podName string) *PodRea
r.log.Errorf("failed parsing to prometheus metric families, %v", err.Error())
return nil
}
var readTotalMetricName string
if vertexType == "reduce" {
readTotalMetricName = "reduce_isb_reader_read_total"
} else if vertexType == "source" {
readTotalMetricName = "source_forwarder_read_total"
} else if vertexType == "sink" {
readTotalMetricName = "sink_forwarder_read_total"
} else {
readTotalMetricName = "forwarder_read_total"
}
if value, ok := result[readTotalMetricName]; ok && value != nil && len(value.GetMetric()) > 0 {
metricsList := value.GetMetric()
partitionReadCount := make(map[string]float64)
for _, ele := range metricsList {
partitionName := ""
var partitionName string
for _, label := range ele.Label {
if label.GetName() == "partition_name" {
partitionName = label.GetValue()
break
}
}
partitionReadCount[partitionName] = ele.Counter.GetValue()
if partitionName == "" {
r.log.Warnf("Partition name is not found for metric %s", readTotalMetricName)
} else {
partitionReadCount[partitionName] = ele.Counter.GetValue()
}
}
podReadCount := &PodReadCount{podName, partitionReadCount}
return podReadCount
Expand All @@ -273,15 +283,7 @@ func (r *Rater) GetRates(vertexName, partitionName string) map[string]float64 {
}

func (r *Rater) buildLookbackSecondsMap(vertexName string) map[string]int64 {
// get the user-specified lookback seconds from the pipeline spec
var userSpecifiedLookBackSeconds int64
// TODO - we can keep a local copy of vertex to lookback seconds mapping to avoid iterating the pipeline spec all the time.
for _, v := range r.pipeline.Spec.Vertices {
if v.Name == vertexName {
userSpecifiedLookBackSeconds = int64(v.Scale.GetLookbackSeconds())
}
}
lookbackSecondsMap := map[string]int64{"default": userSpecifiedLookBackSeconds}
lookbackSecondsMap := map[string]int64{"default": r.userSpecifiedLookBackSeconds[vertexName]}
for k, v := range fixedLookbackSeconds {
lookbackSecondsMap[k] = v
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/daemon/server/service/rater/timestamped_counts.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (

// TimestampedCounts track the total count of processed messages for a list of pods at a given timestamp
type TimestampedCounts struct {
// timestamp in seconds, is the time when the count is recorded
// timestamp in seconds is the time when the count is recorded
timestamp int64
// the key of podPartitionCount represents the pod name, the value represents partition counts map for the pod
// the key of podPartitionCount represents the pod name, the value represents a partition counts map for the pod
// partition counts map holds mappings between partition name and the count of messages processed by the partition
podPartitionCount map[string]map[string]float64
lock *sync.RWMutex
Expand Down

0 comments on commit 8cbb4d6

Please sign in to comment.