Skip to content

Commit

Permalink
Add unmarshaler for otlpgrpc.[*]Request and otlpgrp.[*]Response
Browse files Browse the repository at this point in the history
Fixes #4052

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Oct 19, 2021
1 parent f85f13e commit aefb31b
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 70 deletions.
37 changes: 37 additions & 0 deletions model/otlpgrpc/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
)

var jsonMarshaler = &jsonpb.Marshaler{}
var jsonUnmarshaler = &jsonpb.Unmarshaler{}

// LogsResponse represents the response for gRPC client/server.
type LogsResponse struct {
Expand All @@ -38,6 +39,24 @@ func NewLogsResponse() LogsResponse {
return LogsResponse{orig: &otlpcollectorlog.ExportLogsServiceResponse{}}
}

// UnmarshalLogsResponse unmarshalls LogsResponse from proto bytes.
func UnmarshalLogsResponse(data []byte) (LogsResponse, error) {
var orig otlpcollectorlog.ExportLogsServiceResponse
if err := orig.Unmarshal(data); err != nil {
return LogsResponse{}, err
}
return LogsResponse{orig: &orig}, nil
}

// UnmarshalJSONLogsResponse unmarshalls LogsResponse from JSON bytes.
func UnmarshalJSONLogsResponse(data []byte) (LogsResponse, error) {
var orig otlpcollectorlog.ExportLogsServiceResponse
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), &orig); err != nil {
return LogsResponse{}, err
}
return LogsResponse{orig: &orig}, nil
}

// Marshal marshals LogsResponse into proto bytes.
func (lr LogsResponse) Marshal() ([]byte, error) {
return lr.orig.Marshal()
Expand All @@ -62,6 +81,24 @@ func NewLogsRequest() LogsRequest {
return LogsRequest{orig: &otlpcollectorlog.ExportLogsServiceRequest{}}
}

// UnmarshalLogsRequest unmarshalls LogsRequest from proto bytes.
func UnmarshalLogsRequest(data []byte) (LogsRequest, error) {
var orig otlpcollectorlog.ExportLogsServiceRequest
if err := orig.Unmarshal(data); err != nil {
return LogsRequest{}, err
}
return LogsRequest{orig: &orig}, nil
}

// UnmarshalJSONLogsRequest unmarshalls LogsRequest from JSON bytes.
func UnmarshalJSONLogsRequest(data []byte) (LogsRequest, error) {
var orig otlpcollectorlog.ExportLogsServiceRequest
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), &orig); err != nil {
return LogsRequest{}, err
}
return LogsRequest{orig: &orig}, nil
}

// Marshal marshals LogsRequest into proto bytes.
func (lr LogsRequest) Marshal() ([]byte, error) {
return lr.orig.Marshal()
Expand Down
38 changes: 36 additions & 2 deletions model/otlpgrpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"go.opentelemetry.io/collector/model/pdata"
)

// TODO: Consider to add `MetricsRequest`. If we add non pdata properties we can add them to the request.

// MetricsResponse represents the response for gRPC client/server.
type MetricsResponse struct {
orig *otlpcollectormetrics.ExportMetricsServiceResponse
Expand All @@ -37,6 +35,24 @@ func NewMetricsResponse() MetricsResponse {
return MetricsResponse{orig: &otlpcollectormetrics.ExportMetricsServiceResponse{}}
}

// UnmarshalMetricsResponse unmarshalls MetricsResponse from proto bytes.
func UnmarshalMetricsResponse(data []byte) (MetricsResponse, error) {
var orig otlpcollectormetrics.ExportMetricsServiceResponse
if err := orig.Unmarshal(data); err != nil {
return MetricsResponse{}, err
}
return MetricsResponse{orig: &orig}, nil
}

// UnmarshalJSONMetricsResponse unmarshalls MetricsResponse from JSON bytes.
func UnmarshalJSONMetricsResponse(data []byte) (MetricsResponse, error) {
var orig otlpcollectormetrics.ExportMetricsServiceResponse
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), &orig); err != nil {
return MetricsResponse{}, err
}
return MetricsResponse{orig: &orig}, nil
}

// Marshal marshals MetricsResponse into proto bytes.
func (mr MetricsResponse) Marshal() ([]byte, error) {
return mr.orig.Marshal()
Expand All @@ -61,6 +77,24 @@ func NewMetricsRequest() MetricsRequest {
return MetricsRequest{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{}}
}

// UnmarshalMetricsRequest unmarshalls MetricsRequest from proto bytes.
func UnmarshalMetricsRequest(data []byte) (MetricsRequest, error) {
var orig otlpcollectormetrics.ExportMetricsServiceRequest
if err := orig.Unmarshal(data); err != nil {
return MetricsRequest{}, err
}
return MetricsRequest{orig: &orig}, nil
}

// UnmarshalJSONMetricsRequest unmarshalls MetricsRequest from JSON bytes.
func UnmarshalJSONMetricsRequest(data []byte) (MetricsRequest, error) {
var orig otlpcollectormetrics.ExportMetricsServiceRequest
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), &orig); err != nil {
return MetricsRequest{}, err
}
return MetricsRequest{orig: &orig}, nil
}

// Marshal marshals MetricsRequest into proto bytes.
func (mr MetricsRequest) Marshal() ([]byte, error) {
return mr.orig.Marshal()
Expand Down
38 changes: 36 additions & 2 deletions model/otlpgrpc/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"go.opentelemetry.io/collector/model/pdata"
)

// TODO: Consider to add `TracesRequest`. If we add non pdata properties we can add them to the request.

// TracesResponse represents the response for gRPC client/server.
type TracesResponse struct {
orig *otlpcollectortrace.ExportTraceServiceResponse
Expand All @@ -37,6 +35,24 @@ func NewTracesResponse() TracesResponse {
return TracesResponse{orig: &otlpcollectortrace.ExportTraceServiceResponse{}}
}

// UnmarshalTracesResponse unmarshalls TracesResponse from proto bytes.
func UnmarshalTracesResponse(data []byte) (TracesResponse, error) {
var orig otlpcollectortrace.ExportTraceServiceResponse
if err := orig.Unmarshal(data); err != nil {
return TracesResponse{}, err
}
return TracesResponse{orig: &orig}, nil
}

// UnmarshalJSONTracesResponse unmarshalls TracesResponse from JSON bytes.
func UnmarshalJSONTracesResponse(data []byte) (TracesResponse, error) {
var orig otlpcollectortrace.ExportTraceServiceResponse
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), &orig); err != nil {
return TracesResponse{}, err
}
return TracesResponse{orig: &orig}, nil
}

// Marshal marshals TracesResponse into proto bytes.
func (tr TracesResponse) Marshal() ([]byte, error) {
return tr.orig.Marshal()
Expand All @@ -61,6 +77,24 @@ func NewTracesRequest() TracesRequest {
return TracesRequest{orig: &otlpcollectortrace.ExportTraceServiceRequest{}}
}

// UnmarshalTracesRequest unmarshalls TracesRequest from proto bytes.
func UnmarshalTracesRequest(data []byte) (TracesRequest, error) {
var orig otlpcollectortrace.ExportTraceServiceRequest
if err := orig.Unmarshal(data); err != nil {
return TracesRequest{}, err
}
return TracesRequest{orig: &orig}, nil
}

// UnmarshalJSONTracesRequest unmarshalls TracesRequest from JSON bytes.
func UnmarshalJSONTracesRequest(data []byte) (TracesRequest, error) {
var orig otlpcollectortrace.ExportTraceServiceRequest
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), &orig); err != nil {
return TracesRequest{}, err
}
return TracesRequest{orig: &orig}, nil
}

// Marshal marshals TracesRequest into proto bytes.
func (tr TracesRequest) Marshal() ([]byte, error) {
return tr.orig.Marshal()
Expand Down
63 changes: 8 additions & 55 deletions receiver/otlpreceiver/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/gogo/protobuf/proto"
spb "google.golang.org/genproto/googleapis/rpc/status"

"go.opentelemetry.io/collector/model/otlp"
"go.opentelemetry.io/collector/model/otlpgrpc"
)

Expand All @@ -31,18 +30,8 @@ const (
)

var (
pbEncoder = &protoEncoder{}
jsEncoder = &jsonEncoder{}

tracesPbUnmarshaler = otlp.NewProtobufTracesUnmarshaler()
tracesJSONUnmarshaler = otlp.NewJSONTracesUnmarshaler()

metricsPbUnmarshaler = otlp.NewProtobufMetricsUnmarshaler()
metricsJSONUnmarshaler = otlp.NewJSONMetricsUnmarshaler()

logsPbUnmarshaler = otlp.NewProtobufLogsUnmarshaler()
logsJSONUnmarshaler = otlp.NewJSONLogsUnmarshaler()

pbEncoder = &protoEncoder{}
jsEncoder = &jsonEncoder{}
jsonMarshaler = &jsonpb.Marshaler{}
)

Expand All @@ -63,33 +52,15 @@ type encoder interface {
type protoEncoder struct{}

func (protoEncoder) unmarshalTracesRequest(buf []byte) (otlpgrpc.TracesRequest, error) {
td, err := tracesPbUnmarshaler.UnmarshalTraces(buf)
if err != nil {
return otlpgrpc.TracesRequest{}, err
}
req := otlpgrpc.NewTracesRequest()
req.SetTraces(td)
return req, nil
return otlpgrpc.UnmarshalTracesRequest(buf)
}

func (protoEncoder) unmarshalMetricsRequest(buf []byte) (otlpgrpc.MetricsRequest, error) {
td, err := metricsPbUnmarshaler.UnmarshalMetrics(buf)
if err != nil {
return otlpgrpc.MetricsRequest{}, err
}
req := otlpgrpc.NewMetricsRequest()
req.SetMetrics(td)
return req, nil
return otlpgrpc.UnmarshalMetricsRequest(buf)
}

func (protoEncoder) unmarshalLogsRequest(buf []byte) (otlpgrpc.LogsRequest, error) {
ld, err := logsPbUnmarshaler.UnmarshalLogs(buf)
if err != nil {
return otlpgrpc.LogsRequest{}, err
}
req := otlpgrpc.NewLogsRequest()
req.SetLogs(ld)
return req, nil
return otlpgrpc.UnmarshalLogsRequest(buf)
}

func (protoEncoder) marshalTracesResponse(resp otlpgrpc.TracesResponse) ([]byte, error) {
Expand All @@ -115,33 +86,15 @@ func (protoEncoder) contentType() string {
type jsonEncoder struct{}

func (jsonEncoder) unmarshalTracesRequest(buf []byte) (otlpgrpc.TracesRequest, error) {
td, err := tracesJSONUnmarshaler.UnmarshalTraces(buf)
if err != nil {
return otlpgrpc.TracesRequest{}, err
}
req := otlpgrpc.NewTracesRequest()
req.SetTraces(td)
return req, nil
return otlpgrpc.UnmarshalJSONTracesRequest(buf)
}

func (jsonEncoder) unmarshalMetricsRequest(buf []byte) (otlpgrpc.MetricsRequest, error) {
td, err := metricsJSONUnmarshaler.UnmarshalMetrics(buf)
if err != nil {
return otlpgrpc.MetricsRequest{}, err
}
req := otlpgrpc.NewMetricsRequest()
req.SetMetrics(td)
return req, nil
return otlpgrpc.UnmarshalJSONMetricsRequest(buf)
}

func (jsonEncoder) unmarshalLogsRequest(buf []byte) (otlpgrpc.LogsRequest, error) {
ld, err := logsJSONUnmarshaler.UnmarshalLogs(buf)
if err != nil {
return otlpgrpc.LogsRequest{}, err
}
req := otlpgrpc.NewLogsRequest()
req.SetLogs(ld)
return req, nil
return otlpgrpc.UnmarshalJSONLogsRequest(buf)
}

func (jsonEncoder) marshalTracesResponse(resp otlpgrpc.TracesResponse) ([]byte, error) {
Expand Down
17 changes: 6 additions & 11 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"testing"
"time"

"github.com/gogo/protobuf/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
spb "google.golang.org/genproto/googleapis/rpc/status"
Expand Down Expand Up @@ -179,13 +178,12 @@ func testHTTPJSONRequest(t *testing.T, url string, sink *internalconsumertest.Er

client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err, "Error posting trace to grpc-gateway server: %v", err)
require.NoError(t, err, "Error posting trace to http server: %v", err)

respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("Error reading response from trace grpc-gateway, %v", err)
t.Errorf("Error reading response from trace http server, %v", err)
}
respStr := string(respBytes)
err = resp.Body.Close()
if err != nil {
t.Errorf("Error closing response body, %v", err)
Expand All @@ -194,17 +192,16 @@ func testHTTPJSONRequest(t *testing.T, url string, sink *internalconsumertest.Er
allTraces := sink.AllTraces()
if expectedErr == nil {
assert.Equal(t, 200, resp.StatusCode)
var respJSON map[string]interface{}
assert.NoError(t, json.Unmarshal([]byte(respStr), &respJSON))
assert.Len(t, respJSON, 0, "Got unexpected response from trace grpc-gateway")
_, err = otlpgrpc.UnmarshalJSONTracesResponse(respBytes)
assert.NoError(t, err)

require.Len(t, allTraces, 1)

got := allTraces[0]
assert.EqualValues(t, got, traceOtlp)
} else {
errStatus := &spb.Status{}
assert.NoError(t, json.Unmarshal([]byte(respStr), errStatus))
assert.NoError(t, json.Unmarshal(respBytes, errStatus))
if s, ok := status.FromError(expectedErr); ok {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.True(t, proto.Equal(errStatus, s.Proto()))
Expand Down Expand Up @@ -326,9 +323,7 @@ func testHTTPProtobufRequest(

if expectedErr == nil {
require.Equal(t, 200, resp.StatusCode, "Unexpected return status")
// TODO: Parse otlp response here instead of empty proto when pdata allows that.
tmp := &types.Empty{}
err := tmp.Unmarshal(respBytes)
_, err := otlpgrpc.UnmarshalTracesResponse(respBytes)
require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto")

require.Len(t, allTraces, 1)
Expand Down

0 comments on commit aefb31b

Please sign in to comment.