Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
  • Loading branch information
yhl25 committed Jun 14, 2024
1 parent 61dec39 commit af58b3a
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 18 deletions.
6 changes: 4 additions & 2 deletions pkg/isb/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,20 @@ type Header struct {
Headers map[string]string
}

// MessageID is the message ID
// MessageID is the message ID of the message which is used for exactly-once-semantics.
type MessageID struct {
// VertexName is the name of the vertex
VertexName string
// Offset is the offset of the message
// NOTE: should be unique across the replicas of the vertex, that is the
// reason we don't have a separate replica field in the MessageID
Offset string
// Index is the index of the message
Index int32
}

// String returns the string representation of the MessageID
func (id *MessageID) String() string {
func (id MessageID) String() string {
return fmt.Sprintf("%s-%s-%d", id.VertexName, id.Offset, id.Index)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/isb/stores/redis/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,11 @@ func (f forwardReadWritePerformance) WhereTo(_ []string, _ []string, _ string) (
}

func (f forwardReadWritePerformance) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error) {
return testutils.CopyUDFTestApply(ctx, "test-vertex", message)
return testutils.CopyUDFTestApply(ctx, "testVertex", message)
}

func (f forwardReadWritePerformance) ApplyMapStream(ctx context.Context, message *isb.ReadMessage, writeMessageCh chan<- isb.WriteMessage) error {
return testutils.CopyUDFTestApplyStream(ctx, "test-vertex", writeMessageCh, message)
return testutils.CopyUDFTestApplyStream(ctx, "testVertex", writeMessageCh, message)
}

func (suite *ReadWritePerformance) SetupSuite() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/isb/stores/redis/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (bw *BufferWrite) Write(_ context.Context, messages []isb.Message) ([]isb.O
for idx, message := range messages {
// Reference the Payload in Body directly when writing to Redis ISB to avoid extra marshaling.
// TODO: revisit directly Payload reference when Body structure changes
errs[idx] = script.Run(ctx, bw.Client, []string{bw.GetHashKeyName(message.EventTime), bw.Stream}, message.Header.ID, message.Header, message.Body.Payload, bw.BufferWriteInfo.minId.String()).Err()
errs[idx] = script.Run(ctx, bw.Client, []string{bw.GetHashKeyName(message.EventTime), bw.Stream}, message.Header.ID.String(), message.Header, message.Body.Payload, bw.BufferWriteInfo.minId.String()).Err()
}
} else {
var scriptMissing bool
Expand Down
14 changes: 7 additions & 7 deletions pkg/sources/forward/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,9 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
df.opts.logger.Errorw("failed to apply source transformer", zap.Error(m.Err))
return
}
// update toBuffers
// for each message, we will determine where to send the message.
for _, message := range m.WriteMessages {
if err = df.whereToStep(message, messageToStep, m.ReadMessage); err != nil {
if err = df.whereToStep(message, messageToStep); err != nil {
df.opts.logger.Errorw("failed in whereToStep", zap.Error(err))
return
}
Expand Down Expand Up @@ -715,14 +715,14 @@ func (df *DataForward) applyTransformer(ctx context.Context, readMessage *isb.Re
}

// whereToStep executes the WhereTo interfaces and then updates the to step's writeToBuffers buffer.
func (df *DataForward) whereToStep(writeMessage *isb.WriteMessage, messageToStep map[string][][]isb.Message, readMessage *isb.ReadMessage) error {
func (df *DataForward) whereToStep(writeMessage *isb.WriteMessage, messageToStep map[string][][]isb.Message) error {
// call WhereTo and drop it on errors
to, err := df.toWhichStepDecider.WhereTo(writeMessage.Keys, writeMessage.Tags, writeMessage.ID.String())
if err != nil {
df.opts.logger.Errorw("failed in whereToStep", zap.Error(isb.MessageWriteErr{
Name: df.reader.GetName(),
Header: readMessage.Header,
Body: readMessage.Body,
Header: writeMessage.Header,
Body: writeMessage.Body,
Message: fmt.Sprintf("WhereTo failed, %s", err),
}))

Expand All @@ -745,8 +745,8 @@ func (df *DataForward) whereToStep(writeMessage *isb.WriteMessage, messageToStep
if _, ok := messageToStep[t.ToVertexName]; !ok {
df.opts.logger.Errorw("failed in whereToStep", zap.Error(isb.MessageWriteErr{
Name: df.reader.GetName(),
Header: readMessage.Header,
Body: readMessage.Body,
Header: writeMessage.Header,
Body: writeMessage.Body,
Message: fmt.Sprintf("no such destination (%s)", t.ToVertexName),
}))
}
Expand Down
7 changes: 4 additions & 3 deletions test/api-e2e/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api_e2e
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"testing"
Expand Down Expand Up @@ -58,7 +59,7 @@ func (s *APISuite) TestISBSVC() {
for !strings.Contains(getISBSVCBody, `"status":"healthy"`) {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
s.T().Fatalf("failed to get namespaces/isb-services: %v", ctx.Err())
}
default:
Expand Down Expand Up @@ -102,7 +103,7 @@ func (s *APISuite) TestISBSVCReplica1() {
for !strings.Contains(getISBSVCBody, `"status":"healthy"`) {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
s.T().Fatalf("failed to get namespaces/isb-services: %v", ctx.Err())
}
default:
Expand Down Expand Up @@ -206,7 +207,7 @@ func (s *APISuite) TestPipeline1() {
for strings.Contains(getPipelineISBsBody, "errMsg") {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
s.T().Fatalf("failed to get piplines/isbs: %v", ctx.Err())
}
default:
Expand Down
7 changes: 4 additions & 3 deletions test/http-e2e/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (
"fmt"
"testing"

. "github.com/numaproj/numaflow/test/fixtures"
"github.com/stretchr/testify/suite"

. "github.com/numaproj/numaflow/test/fixtures"
)

//go:generate kubectl apply -f testdata/http-auth-fake-secret.yaml -n numaflow-system
Expand All @@ -43,8 +44,8 @@ func (s *HTTPSuite) TestHTTPSourcePipeline() {
cmd := fmt.Sprintf("kubectl -n %s get svc -lnumaflow.numaproj.io/pipeline-name=%s,numaflow.numaproj.io/vertex-name=%s | grep -v CLUSTER-IP | grep -v headless", Namespace, "http-source", "in")
w.Exec("sh", []string{"-c", cmd}, OutputRegexp("http-source-in"))

w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("no-id")))
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("no-id")))
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("no-id"))).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("no-id")))
// No x-numaflow-id, expect 2 outputs
w.Expect().SinkContains("out", "no-id", SinkCheckWithContainCount(2))

Expand Down

0 comments on commit af58b3a

Please sign in to comment.