Skip to content

Commit

Permalink
test: verify processing rate match across vertices when UDSource has …
Browse files Browse the repository at this point in the history
…multiple replicas (#1621)

Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
KeranYang authored and whynowy committed Apr 2, 2024
1 parent 3d82431 commit 1735370
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 4 deletions.
2 changes: 2 additions & 0 deletions config/apps/redis/redis-minimal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ metadata:
labels:
app: redis
data:
# maxmemory is set to 100mb to ensure enough storage for running a single e2e test suite.
# a lower number can lead to redis sink write failure.
redis-config: |
maxmemory 100mb
maxmemory-policy allkeys-lru
Expand Down
8 changes: 8 additions & 0 deletions test/udsource-e2e/testdata/simple-source-go.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,17 @@ spec:
image: quay.io/numaio/numaflow-go/source-simple-source:stable
limits:
readBatchSize: 500
scale:
# set it as two pods to be different from the sink such that we can use this pipeline
# to test processing rate is consistent across vertices when they have different replica counts.
min: 2
max: 2
- name: out
sink:
log: {}
scale:
min: 1
max: 1
edges:
- from: in
to: out
82 changes: 78 additions & 4 deletions test/udsource-e2e/udsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@ limitations under the License.
package e2e

import (
"context"
"fmt"
"math"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
daemonclient "github.com/numaproj/numaflow/pkg/daemon/client"
. "github.com/numaproj/numaflow/test/fixtures"
)

Expand All @@ -33,15 +39,15 @@ type UserDefinedSourceSuite struct {
}

func (s *UserDefinedSourceSuite) testSimpleSourceGo() {
s.testSimpleSource("go")
s.testSimpleSource("go", true)
}

func (s *UserDefinedSourceSuite) testSimpleSourceJava() {
s.testSimpleSource("java")
s.testSimpleSource("java", false)
}

func (s *UserDefinedSourceSuite) testSimpleSourcePython() {
s.testSimpleSource("python")
s.testSimpleSource("python", false)
}

func (s *UserDefinedSourceSuite) TestUDSource() {
Expand All @@ -62,7 +68,7 @@ func (s *UserDefinedSourceSuite) TestUDSource() {
wg.Wait()
}

func (s *UserDefinedSourceSuite) testSimpleSource(lang string) {
func (s *UserDefinedSourceSuite) testSimpleSource(lang string, verifyRate bool) {
w := s.Given().Pipeline(fmt.Sprintf("@testdata/simple-source-%s.yaml", lang)).
When().
CreatePipelineAndWait()
Expand All @@ -84,6 +90,74 @@ func (s *UserDefinedSourceSuite) testSimpleSource(lang string) {
w.Expect().VertexPodLogContains("out", "630")
w.Expect().VertexPodLogContains("out", "999")

if verifyRate {
pipelineName := fmt.Sprintf("simple-source-%s", lang)
// wait for the daemon server to come up
w.Expect().DaemonPodsRunning().DaemonPodLogContains(pipelineName, LogDaemonStarted)
// port-forward daemon server
defer w.DaemonPodPortForward(pipelineName, 1234, dfv1.DaemonServicePort).
TerminateAllPodPortForwards()

// verify the processing rate match between source and sink
client, err := daemonclient.NewDaemonServiceClient("localhost:1234")
assert.NoError(s.T(), err)
defer func() {
_ = client.Close()
}()
// timeout the test if rates don't match within 2 minutes.
timer := time.NewTimer(120 * time.Second)
// we use 10-second windows for rate calculation
// wait for 10 seconds for a new timestamped count entry to be added to the rate calculation windows
waitInterval := 10 * time.Second
succeedChan := make(chan struct{})
go func() {
vertexNames := []string{"in", "out"}
for {
var rates []float64
for _, vertexName := range vertexNames {
m, err := client.GetVertexMetrics(context.Background(), pipelineName, vertexName)
assert.NoError(s.T(), err)
assert.Equal(s.T(), pipelineName, *m[0].Pipeline)
oneMinRate := m[0].ProcessingRates["1m"]
rates = append(rates, oneMinRate)
}
if !ratesMatch(rates) {
time.Sleep(waitInterval)
} else {
succeedChan <- struct{}{}
break
}
}
}()
select {
case <-succeedChan:
break
case <-timer.C:
assert.Fail(s.T(), "timed out waiting for processing rate to match across vertices.")
}
timer.Stop()
}
}

func ratesMatch(rates []float64) bool {
if len(rates) <= 1 {
return true
}
firstVal := rates[0]
// the simple source with 2 pods can reach 8k TPS.
// we don't compare until the pipeline is stable.
// using 5k as a threshold
if firstVal < 5000 {
return false
}
for i := 1; i < len(rates); i++ {
diff := math.Abs(firstVal - rates[i])
// 0.1 - the processing rate compared with source vertex should not be off by more than 10%.
if diff > (firstVal * 0.1) {
return false
}
}
return true
}

func TestUserDefinedSourceSuite(t *testing.T) {
Expand Down

0 comments on commit 1735370

Please sign in to comment.