Skip to content

Conversation

@jy4096
Copy link
Contributor

@jy4096 jy4096 commented Dec 13, 2022

numaproj/numaflow#335
gRPC development reference: #4 (comment)

Test

Client

Use numaflow-go sdk client code as the client

package server

import (
	"context"
	"testing"
	"time"

	sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1"
	"github.com/numaproj/numaflow-go/pkg/sink/client"
	"github.com/stretchr/testify/assert"
	"google.golang.org/protobuf/types/known/timestamppb"
)

func Test_server_sink(t *testing.T) {
	var ctx = context.Background()
	c, err := client.New(client.WithSockAddr("/tmp/numaflow-test.sock"))
	assert.NoError(t, err)
	defer func() {
		err = c.CloseConn(ctx)
		assert.NoError(t, err)
	}()
	testDatumList := []*sinkpb.Datum{
		{
			Id:        "test_id_0",
			Value:     []byte(`sink_message_success`),
			EventTime: &sinkpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169600, 0))},
			Watermark: &sinkpb.Watermark{Watermark: timestamppb.New(time.Time{})},
		},
		{
			Id:        "test_id_1",
			Value:     []byte(`sink_message_success`),
			EventTime: &sinkpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169600, 0))},
			Watermark: &sinkpb.Watermark{Watermark: timestamppb.New(time.Time{})},
		},
	}
	responseList, err := c.SinkFn(ctx, testDatumList)
	assert.NoError(t, err)
	expectedResponseList := []*sinkpb.Response{
		{
			Id:      "test_id_0",
			Success: true,
			ErrMsg:  "",
		},
		{
			Id:      "test_id_1",
			Success: true,
			ErrMsg:  "",
		},
	}
	assert.Equal(t, expectedResponseList, responseList)
}

Server

Use numaflow-python sdk sink example log code as the server

from typing import List
from pynumaflow.sink import Datum, Responses, Response, UserDefinedSinkServicer


def udsink_handler(datums: List[Datum]) -> Responses:
    responses = Responses()
    for msg in datums:
        print("User Defined Sink", msg.id, msg.value.decode("utf-8"))
        responses.append(Response.as_success(msg.id))
    return responses


if __name__ == "__main__":
    grpc_server = UserDefinedSinkServicer(udsink_handler, sock_path="/tmp/numaflow-test.sock")
    grpc_server.start()

Result

client test output

=== RUN   Test_server_sink
=== RUN   Test_server_sink/server_sink
--- PASS: Test_server_sink (0.00s)
    --- PASS: Test_server_sink/server_sink (0.00s)
PASS

Process finished with the exit code 0

server udsink output

GRPC Server listening on: unix:///tmp/numaflow-test.sock with max threads: 64
User Defined Sink test_id_0 sink_message_success
User Defined Sink test_id_1 sink_message_success

jyu6 added 4 commits December 12, 2022 16:08
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
@codecov
Copy link

codecov bot commented Dec 13, 2022

Codecov Report

Merging #34 (f11a3c4) into main (ad55710) will not change coverage.
The diff coverage is 100.00%.

@@           Coverage Diff           @@
##             main      #34   +/-   ##
=======================================
  Coverage   98.38%   98.38%           
=======================================
  Files           9        9           
  Lines         247      247           
  Branches       14       14           
=======================================
  Hits          243      243           
  Misses          2        2           
  Partials        2        2           
Impacted Files Coverage Δ
pynumaflow/sink/server.py 95.12% <100.00%> (ø)

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

jyu6 added 3 commits December 13, 2022 15:00
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
@jy4096 jy4096 marked this pull request as ready for review December 14, 2022 23:36
jyu6 added 2 commits December 14, 2022 15:39
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Copy link
Member

@ab93 ab93 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's fix the lint/format issues.
Also, this change applies to only udsinks right?

@ab93
Copy link
Member

ab93 commented Dec 14, 2022

@chromevoid to fix the lint issue, you might have to change the .flake8 file, and make it like this: https://github.com/numaproj/numalogic/blob/main/.flake8

@jy4096
Copy link
Contributor Author

jy4096 commented Dec 14, 2022

Let's fix the lint/format issues. Also, this change applies to only udsinks right?

Yes only udsink is updated from unary rpc to streaming rpc.

vigith and others added 3 commits December 14, 2022 16:12
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
…ython into udsink-grpc-stream

Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Copy link
Member

@ab93 ab93 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vigith I think it might be a better idea to fix the examples once v0.2.7 is released. Otherwise the example codes will fail for the time being. What do you think?

jyu6 added 2 commits December 14, 2022 16:24
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
@vigith vigith merged commit e717b1b into numaproj:main Dec 15, 2022
@jy4096 jy4096 deleted the udsink-grpc-stream branch December 20, 2022 21:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants