diff --git a/docs/grpc.md b/docs/grpc.md index ac4386e..3f14c40 100644 --- a/docs/grpc.md +++ b/docs/grpc.md @@ -369,10 +369,53 @@ For streaming responses, messages are separated by blank lines in the output. Fo `fetch` reports gRPC status errors from response trailers. If the server returns a non-OK gRPC status (e.g., `INTERNAL`, `NOT_FOUND`), the error is printed to stderr and the exit code is set to 1. +## Client Streaming + +`fetch` supports client-side streaming gRPC calls. When the proto schema indicates a method is client-streaming, multiple JSON objects in the request body are each converted to a separate protobuf message and sent as individual gRPC frames. + +Detection is automatic via the method descriptor in the proto schema — no additional flags are needed. + +### Inline Data + +Provide multiple JSON objects separated by whitespace: + +```sh +fetch --grpc --proto-file service.proto \ + -d '{"value":"one"}{"value":"two"}{"value":"three"}' \ + https://localhost:50051/pkg.Service/ClientStream +``` + +### NDJSON from File + +```sh +fetch --grpc --proto-file service.proto \ + -d @messages.ndjson \ + https://localhost:50051/pkg.Service/ClientStream +``` + +### Streaming from Stdin + +Pipe data from stdin for real-time streaming — each JSON object is sent as soon as it is parsed: + +```sh +cat messages.ndjson | fetch --grpc --proto-file service.proto \ + -d @- https://localhost:50051/pkg.Service/ClientStream +``` + +## Bidirectional Streaming + +Bidirectional streaming is supported with the same mechanism as client streaming. When piping from stdin, request frames are sent incrementally while response frames are received and displayed concurrently: + +```sh +cat messages.ndjson | fetch --grpc --proto-file service.proto \ + -d @- https://localhost:50051/pkg.Service/BidiStream +``` + +Both directions flow on the same HTTP/2 stream. The response is formatted and displayed as messages arrive, just like server streaming. + ## Limitations -- **Client-side and bidirectional streaming are not supported**: Only unary and server-streaming RPCs are supported -- **Single request message**: Cannot send multiple messages in one request +- **Client/bidi streaming requires a proto schema**: The `--proto-file` or `--proto-desc` flag must be provided so `fetch` can detect that a method is client-streaming - **gRPC-Web**: Standard gRPC protocol only, not gRPC-Web ## See Also diff --git a/integration/integration_test.go b/integration/integration_test.go index f544c1b..81f1d7c 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -37,6 +37,8 @@ import ( "github.com/klauspost/compress/gzip" "github.com/klauspost/compress/zstd" "google.golang.org/protobuf/encoding/protowire" + protoMarshal "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/descriptorpb" ) func TestMain(t *testing.T) { @@ -1103,6 +1105,136 @@ func TestMain(t *testing.T) { assertBufContains(t, res.stderr, "oh no!") }) + t.Run("grpc client streaming", func(t *testing.T) { + // Build a FileDescriptorSet with a client-streaming method. + boolTrue := true + strType := descriptorpb.FieldDescriptorProto_TYPE_STRING + int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64 + fds := &descriptorpb.FileDescriptorSet{ + File: []*descriptorpb.FileDescriptorProto{ + { + Name: strPtr("stream.proto"), + Package: strPtr("streampkg"), + Syntax: strPtr("proto3"), + MessageType: []*descriptorpb.DescriptorProto{ + { + Name: strPtr("StreamRequest"), + Field: []*descriptorpb.FieldDescriptorProto{ + { + Name: strPtr("value"), + Number: int32Ptr(1), + Type: &strType, + }, + }, + }, + { + Name: strPtr("StreamResponse"), + Field: []*descriptorpb.FieldDescriptorProto{ + { + Name: strPtr("count"), + Number: int32Ptr(1), + Type: &int64Type, + }, + }, + }, + }, + Service: []*descriptorpb.ServiceDescriptorProto{ + { + Name: strPtr("StreamService"), + Method: []*descriptorpb.MethodDescriptorProto{ + { + Name: strPtr("ClientStream"), + InputType: strPtr(".streampkg.StreamRequest"), + OutputType: strPtr(".streampkg.StreamResponse"), + ClientStreaming: &boolTrue, + }, + }, + }, + }, + }, + }, + } + + // Serialize the descriptor set to a temp file. + descData, err := protoMarshal.Marshal(fds) + if err != nil { + t.Fatalf("failed to marshal descriptor set: %v", err) + } + descFile := filepath.Join(t.TempDir(), "stream.pb") + if err := os.WriteFile(descFile, descData, 0644); err != nil { + t.Fatalf("failed to write descriptor file: %v", err) + } + + // Server reads gRPC frames from request body, counts them, + // and returns count as a gRPC-framed protobuf response. + server := startServer(func(w http.ResponseWriter, r *http.Request) { + // Count incoming gRPC frames. + var count int + for { + var header [5]byte + _, err := io.ReadFull(r.Body, header[:]) + if err != nil { + break + } + length := binary.BigEndian.Uint32(header[1:5]) + if length > 0 { + buf := make([]byte, length) + _, err = io.ReadFull(r.Body, buf) + if err != nil { + break + } + } + count++ + } + + // Build response: field 1 (count) as varint. + var respData []byte + respData = protowire.AppendTag(respData, 1, protowire.VarintType) + respData = protowire.AppendVarint(respData, uint64(count)) + + // Frame the response. + framedResp := make([]byte, 5+len(respData)) + framedResp[0] = 0 + binary.BigEndian.PutUint32(framedResp[1:5], uint32(len(respData))) + copy(framedResp[5:], respData) + + w.Header().Set("Content-Type", "application/grpc+proto") + w.WriteHeader(200) + w.Write(framedResp) + }) + defer server.Close() + + t.Run("multiple messages", func(t *testing.T) { + data := `{"value":"one"}{"value":"two"}{"value":"three"}` + res := runFetch(t, fetchPath, + server.URL+"/streampkg.StreamService/ClientStream", + "--grpc", "--proto-desc", descFile, + "-d", data, + "--http", "1", "--format", "on") + assertExitCode(t, 0, res) + assertBufContains(t, res.stdout, "3") + }) + + t.Run("single message", func(t *testing.T) { + data := `{"value":"only"}` + res := runFetch(t, fetchPath, + server.URL+"/streampkg.StreamService/ClientStream", + "--grpc", "--proto-desc", descFile, + "-d", data, + "--http", "1", "--format", "on") + assertExitCode(t, 0, res) + assertBufContains(t, res.stdout, "1") + }) + + t.Run("empty stream", func(t *testing.T) { + res := runFetch(t, fetchPath, + server.URL+"/streampkg.StreamService/ClientStream", + "--grpc", "--proto-desc", descFile, + "--http", "1", "--format", "on") + assertExitCode(t, 0, res) + }) + }) + t.Run("proto flags mutual exclusivity", func(t *testing.T) { // proto-file and proto-desc cannot be used together // Create temp files so we get past file existence validation @@ -1933,3 +2065,6 @@ func startMTLSServer(t *testing.T, certPath, keyPath, caCertPath string) *httpte server.StartTLS() return server } + +func strPtr(s string) *string { return &s } +func int32Ptr(i int32) *int32 { return &i } diff --git a/internal/fetch/fetch.go b/internal/fetch/fetch.go index 40466b7..88f24a5 100644 --- a/internal/fetch/fetch.go +++ b/internal/fetch/fetch.go @@ -129,9 +129,10 @@ func fetch(ctx context.Context, r *Request) (int, error) { // 2. Setup gRPC (adds headers, sets HTTP version, finds descriptors). var requestDesc protoreflect.MessageDescriptor + var isClientStreaming bool if r.GRPC { var err error - requestDesc, r.responseDescriptor, err = setupGRPC(r, schema) + requestDesc, r.responseDescriptor, isClientStreaming, err = setupGRPC(r, schema) if err != nil { return 0, err } @@ -200,30 +201,35 @@ func fetch(ctx context.Context, r *Request) (int, error) { } } - // 5. Convert JSON to protobuf AFTER edit. - if requestDesc != nil && req.Body != nil && req.Body != http.NoBody { - // Read the body and convert. - converted, err := convertJSONToProtobuf(req.Body, requestDesc) - if err != nil { - return 0, err - } - req.Body = io.NopCloser(converted) - if req.Header.Get("Content-Type") == "" { - req.Header.Set("Content-Type", "application/protobuf") - } - } - - // 6. Frame gRPC request AFTER conversion. - // gRPC requires framing even for empty messages. + // 5. Convert and frame gRPC request AFTER edit. if r.GRPC { - framed, err := frameGRPCRequest(req.Body) - if err != nil { - return 0, err + if isClientStreaming && requestDesc != nil { + // Client/bidi streaming: stream multiple JSON objects as gRPC frames. + if req.Body != nil && req.Body != http.NoBody { + req.Body = streamGRPCRequest(req.Body, requestDesc) + req.ContentLength = -1 // Unknown length; use chunked encoding. + } else { + // Empty client stream: no frames, just close immediately. + req.Body = http.NoBody + } + } else { + // Unary / server-streaming: existing single-message path. + if requestDesc != nil && req.Body != nil && req.Body != http.NoBody { + converted, err := convertJSONToProtobuf(req.Body, requestDesc) + if err != nil { + return 0, err + } + req.Body = io.NopCloser(converted) + } + framed, err := frameGRPCRequest(req.Body) + if err != nil { + return 0, err + } + req.Body = io.NopCloser(framed) } - req.Body = io.NopCloser(framed) } - // 7. Print request metadata / dry-run. + // 6. Print request metadata / dry-run. if r.Verbosity >= core.VExtraVerbose || r.DryRun { errPrinter := r.PrinterHandle.Stderr() printRequestMetadata(errPrinter, req, r.HTTP) @@ -263,7 +269,7 @@ func fetch(ctx context.Context, r *Request) (int, error) { req = req.WithContext(ctx) } - // 8. Make request. + // 7. Make request. code, err := makeRequest(ctx, r, c, req) // Save session cookies after request completes. diff --git a/internal/fetch/proto.go b/internal/fetch/proto.go index b067ead..b5666dd 100644 --- a/internal/fetch/proto.go +++ b/internal/fetch/proto.go @@ -2,6 +2,7 @@ package fetch import ( "bytes" + "encoding/json" "fmt" "io" "net/http" @@ -73,22 +74,24 @@ func parseGRPCPath(urlPath string) (serviceName, methodName string, err error) { } // setupGRPC configures request for gRPC protocol. -// Returns headers to add, HTTP version, and request/response descriptors. -func setupGRPC(r *Request, schema *proto.Schema) (protoreflect.MessageDescriptor, protoreflect.MessageDescriptor, error) { +// Returns request/response descriptors, whether the method is client-streaming, and any error. +func setupGRPC(r *Request, schema *proto.Schema) (protoreflect.MessageDescriptor, protoreflect.MessageDescriptor, bool, error) { var requestDesc, responseDesc protoreflect.MessageDescriptor + var isClientStreaming bool if schema != nil && r.URL != nil { serviceName, methodName, err := parseGRPCPath(r.URL.Path) if err != nil { - return nil, nil, err + return nil, nil, false, err } fullMethod := serviceName + "/" + methodName method, err := schema.FindMethod(fullMethod) if err != nil { - return nil, nil, err + return nil, nil, false, err } requestDesc = method.Input() responseDesc = method.Output() + isClientStreaming = method.IsStreamingClient() } if r.HTTP == core.HTTPDefault { @@ -100,7 +103,7 @@ func setupGRPC(r *Request, schema *proto.Schema) (protoreflect.MessageDescriptor r.Headers = append(r.Headers, fetchgrpc.Headers()...) r.Headers = append(r.Headers, fetchgrpc.AcceptHeader()) - return requestDesc, responseDesc, nil + return requestDesc, responseDesc, isClientStreaming, nil } // convertJSONToProtobuf converts JSON body to protobuf. @@ -136,3 +139,35 @@ func frameGRPCRequest(data io.Reader) (io.Reader, error) { framedData := fetchgrpc.Frame(rawData, false) return bytes.NewReader(framedData), nil } + +// streamGRPCRequest reads JSON objects from data, converts each to protobuf, +// frames each as a gRPC message, and streams them through an io.Pipe. +// Returns an io.ReadCloser to use as the request body. +func streamGRPCRequest(data io.Reader, desc protoreflect.MessageDescriptor) io.ReadCloser { + pr, pw := io.Pipe() + go func() { + defer pw.Close() + decoder := json.NewDecoder(data) + for { + var raw json.RawMessage + err := decoder.Decode(&raw) + if err == io.EOF { + return + } + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to decode JSON message: %w", err)) + return + } + protoData, err := proto.JSONToProtobuf(raw, desc) + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to convert JSON to protobuf: %w", err)) + return + } + frame := fetchgrpc.Frame(protoData, false) + if _, err := pw.Write(frame); err != nil { + return // pipe closed by reader + } + } + }() + return pr +} diff --git a/internal/fetch/proto_test.go b/internal/fetch/proto_test.go new file mode 100644 index 0000000..32fc489 --- /dev/null +++ b/internal/fetch/proto_test.go @@ -0,0 +1,159 @@ +package fetch + +import ( + "bytes" + "io" + "strings" + "testing" + + fetchgrpc "github.com/ryanfowler/fetch/internal/grpc" + "github.com/ryanfowler/fetch/internal/proto" + + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/types/descriptorpb" +) + +func TestStreamGRPCRequest(t *testing.T) { + desc := testMessageDescriptor(t) + + t.Run("single message", func(t *testing.T) { + input := `{"name":"hello"}` + rc := streamGRPCRequest(strings.NewReader(input), desc) + defer rc.Close() + + frames := readAllFrames(t, rc) + if len(frames) != 1 { + t.Fatalf("expected 1 frame, got %d", len(frames)) + } + }) + + t.Run("multiple messages", func(t *testing.T) { + input := `{"name":"one"}{"name":"two"}{"name":"three"}` + rc := streamGRPCRequest(strings.NewReader(input), desc) + defer rc.Close() + + frames := readAllFrames(t, rc) + if len(frames) != 3 { + t.Fatalf("expected 3 frames, got %d", len(frames)) + } + }) + + t.Run("ndjson style", func(t *testing.T) { + input := "{\"name\":\"one\"}\n{\"name\":\"two\"}\n{\"name\":\"three\"}\n" + rc := streamGRPCRequest(strings.NewReader(input), desc) + defer rc.Close() + + frames := readAllFrames(t, rc) + if len(frames) != 3 { + t.Fatalf("expected 3 frames, got %d", len(frames)) + } + }) + + t.Run("empty input", func(t *testing.T) { + rc := streamGRPCRequest(strings.NewReader(""), desc) + defer rc.Close() + + data, err := io.ReadAll(rc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(data) != 0 { + t.Fatalf("expected empty output, got %d bytes", len(data)) + } + }) + + t.Run("invalid json", func(t *testing.T) { + rc := streamGRPCRequest(strings.NewReader("{invalid"), desc) + defer rc.Close() + + _, err := io.ReadAll(rc) + if err == nil { + t.Fatal("expected error for invalid JSON") + } + if !strings.Contains(err.Error(), "failed to decode JSON message") { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("whitespace between objects", func(t *testing.T) { + input := " {\"name\":\"one\"} \n\n {\"name\":\"two\"} " + rc := streamGRPCRequest(strings.NewReader(input), desc) + defer rc.Close() + + frames := readAllFrames(t, rc) + if len(frames) != 2 { + t.Fatalf("expected 2 frames, got %d", len(frames)) + } + }) +} + +// testMessageDescriptor builds a simple protobuf message descriptor for testing. +func testMessageDescriptor(t *testing.T) protoreflect.MessageDescriptor { + t.Helper() + + strType := descriptorpb.FieldDescriptorProto_TYPE_STRING + int64Type := descriptorpb.FieldDescriptorProto_TYPE_INT64 + fds := &descriptorpb.FileDescriptorSet{ + File: []*descriptorpb.FileDescriptorProto{ + { + Name: strp("test.proto"), + Package: strp("testpkg"), + Syntax: strp("proto3"), + MessageType: []*descriptorpb.DescriptorProto{ + { + Name: strp("TestMessage"), + Field: []*descriptorpb.FieldDescriptorProto{ + { + Name: strp("id"), + Number: int32p(1), + Type: &int64Type, + }, + { + Name: strp("name"), + Number: int32p(2), + Type: &strType, + }, + }, + }, + }, + }, + }, + } + + schema, err := proto.LoadFromDescriptorSet(fds) + if err != nil { + t.Fatalf("failed to load descriptor set: %v", err) + } + md, err := schema.FindMessage("testpkg.TestMessage") + if err != nil { + t.Fatalf("failed to find message: %v", err) + } + return md +} + +// readAllFrames reads all gRPC frames from a reader. +func readAllFrames(t *testing.T, r io.Reader) [][]byte { + t.Helper() + + data, err := io.ReadAll(r) + if err != nil { + t.Fatalf("failed to read all data: %v", err) + } + + var frames [][]byte + reader := bytes.NewReader(data) + for { + frame, _, err := fetchgrpc.ReadFrame(reader) + if err == io.EOF { + break + } + if err != nil { + t.Fatalf("failed to read frame: %v", err) + } + frames = append(frames, frame) + } + return frames +} + +func strp(s string) *string { return &s } +func int32p(i int32) *int32 { return &i }