Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unmarshaler for otlpgrpc.[*]Request and otlpgrp.[*]Response #4215

Merged
merged 1 commit into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 37 additions & 0 deletions model/otlpgrpc/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
)

var jsonMarshaler = &jsonpb.Marshaler{}
var jsonUnmarshaler = &jsonpb.Unmarshaler{}

// LogsResponse represents the response for gRPC client/server.
type LogsResponse struct {
Expand All @@ -38,6 +39,24 @@ func NewLogsResponse() LogsResponse {
return LogsResponse{orig: &otlpcollectorlog.ExportLogsServiceResponse{}}
}

// UnmarshalLogsResponse unmarshalls LogsResponse from proto bytes.
func UnmarshalLogsResponse(data []byte) (LogsResponse, error) {
var orig otlpcollectorlog.ExportLogsServiceResponse
if err := orig.Unmarshal(data); err != nil {
return LogsResponse{}, err
}
return LogsResponse{orig: &orig}, nil
}

// UnmarshalJSONLogsResponse unmarshalls LogsResponse from JSON bytes.
func UnmarshalJSONLogsResponse(data []byte) (LogsResponse, error) {
var orig otlpcollectorlog.ExportLogsServiceResponse
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), &orig); err != nil {
return LogsResponse{}, err
}
return LogsResponse{orig: &orig}, nil
}

// Marshal marshals LogsResponse into proto bytes.
func (lr LogsResponse) Marshal() ([]byte, error) {
return lr.orig.Marshal()
Expand All @@ -62,6 +81,24 @@ func NewLogsRequest() LogsRequest {
return LogsRequest{orig: &otlpcollectorlog.ExportLogsServiceRequest{}}
}

// UnmarshalLogsRequest unmarshalls LogsRequest from proto bytes.
func UnmarshalLogsRequest(data []byte) (LogsRequest, error) {
var orig otlpcollectorlog.ExportLogsServiceRequest
if err := orig.Unmarshal(data); err != nil {
return LogsRequest{}, err
}
return LogsRequest{orig: &orig}, nil
}

// UnmarshalJSONLogsRequest unmarshalls LogsRequest from JSON bytes.
func UnmarshalJSONLogsRequest(data []byte) (LogsRequest, error) {
var orig otlpcollectorlog.ExportLogsServiceRequest
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), &orig); err != nil {
return LogsRequest{}, err
}
return LogsRequest{orig: &orig}, nil
}

// Marshal marshals LogsRequest into proto bytes.
func (lr LogsRequest) Marshal() ([]byte, error) {
return lr.orig.Marshal()
Expand Down
38 changes: 36 additions & 2 deletions model/otlpgrpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"go.opentelemetry.io/collector/model/pdata"
)

// TODO: Consider to add `MetricsRequest`. If we add non pdata properties we can add them to the request.

// MetricsResponse represents the response for gRPC client/server.
type MetricsResponse struct {
orig *otlpcollectormetrics.ExportMetricsServiceResponse
Expand All @@ -37,6 +35,24 @@ func NewMetricsResponse() MetricsResponse {
return MetricsResponse{orig: &otlpcollectormetrics.ExportMetricsServiceResponse{}}
}

// UnmarshalMetricsResponse unmarshalls MetricsResponse from proto bytes.
func UnmarshalMetricsResponse(data []byte) (MetricsResponse, error) {
var orig otlpcollectormetrics.ExportMetricsServiceResponse
if err := orig.Unmarshal(data); err != nil {
return MetricsResponse{}, err
}
return MetricsResponse{orig: &orig}, nil
}

// UnmarshalJSONMetricsResponse unmarshalls MetricsResponse from JSON bytes.
func UnmarshalJSONMetricsResponse(data []byte) (MetricsResponse, error) {
var orig otlpcollectormetrics.ExportMetricsServiceResponse
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), &orig); err != nil {
return MetricsResponse{}, err
}
return MetricsResponse{orig: &orig}, nil
}

// Marshal marshals MetricsResponse into proto bytes.
func (mr MetricsResponse) Marshal() ([]byte, error) {
return mr.orig.Marshal()
Expand All @@ -61,6 +77,24 @@ func NewMetricsRequest() MetricsRequest {
return MetricsRequest{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{}}
}

// UnmarshalMetricsRequest unmarshalls MetricsRequest from proto bytes.
func UnmarshalMetricsRequest(data []byte) (MetricsRequest, error) {
var orig otlpcollectormetrics.ExportMetricsServiceRequest
if err := orig.Unmarshal(data); err != nil {
return MetricsRequest{}, err
}
return MetricsRequest{orig: &orig}, nil
}

// UnmarshalJSONMetricsRequest unmarshalls MetricsRequest from JSON bytes.
func UnmarshalJSONMetricsRequest(data []byte) (MetricsRequest, error) {
var orig otlpcollectormetrics.ExportMetricsServiceRequest
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), &orig); err != nil {
return MetricsRequest{}, err
}
return MetricsRequest{orig: &orig}, nil
}

// Marshal marshals MetricsRequest into proto bytes.
func (mr MetricsRequest) Marshal() ([]byte, error) {
return mr.orig.Marshal()
Expand Down
38 changes: 36 additions & 2 deletions model/otlpgrpc/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"go.opentelemetry.io/collector/model/pdata"
)

// TODO: Consider to add `TracesRequest`. If we add non pdata properties we can add them to the request.

// TracesResponse represents the response for gRPC client/server.
type TracesResponse struct {
orig *otlpcollectortrace.ExportTraceServiceResponse
Expand All @@ -37,6 +35,24 @@ func NewTracesResponse() TracesResponse {
return TracesResponse{orig: &otlpcollectortrace.ExportTraceServiceResponse{}}
}

// UnmarshalTracesResponse unmarshalls TracesResponse from proto bytes.
func UnmarshalTracesResponse(data []byte) (TracesResponse, error) {
var orig otlpcollectortrace.ExportTraceServiceResponse
if err := orig.Unmarshal(data); err != nil {
return TracesResponse{}, err
}
return TracesResponse{orig: &orig}, nil
}

// UnmarshalJSONTracesResponse unmarshalls TracesResponse from JSON bytes.
func UnmarshalJSONTracesResponse(data []byte) (TracesResponse, error) {
var orig otlpcollectortrace.ExportTraceServiceResponse
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), &orig); err != nil {
return TracesResponse{}, err
}
return TracesResponse{orig: &orig}, nil
}

// Marshal marshals TracesResponse into proto bytes.
func (tr TracesResponse) Marshal() ([]byte, error) {
return tr.orig.Marshal()
Expand All @@ -61,6 +77,24 @@ func NewTracesRequest() TracesRequest {
return TracesRequest{orig: &otlpcollectortrace.ExportTraceServiceRequest{}}
}

// UnmarshalTracesRequest unmarshalls TracesRequest from proto bytes.
func UnmarshalTracesRequest(data []byte) (TracesRequest, error) {
var orig otlpcollectortrace.ExportTraceServiceRequest
if err := orig.Unmarshal(data); err != nil {
return TracesRequest{}, err
}
return TracesRequest{orig: &orig}, nil
}

// UnmarshalJSONTracesRequest unmarshalls TracesRequest from JSON bytes.
func UnmarshalJSONTracesRequest(data []byte) (TracesRequest, error) {
var orig otlpcollectortrace.ExportTraceServiceRequest
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), &orig); err != nil {
return TracesRequest{}, err
}
return TracesRequest{orig: &orig}, nil
}

// Marshal marshals TracesRequest into proto bytes.
func (tr TracesRequest) Marshal() ([]byte, error) {
return tr.orig.Marshal()
Expand Down
63 changes: 8 additions & 55 deletions receiver/otlpreceiver/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/gogo/protobuf/proto"
spb "google.golang.org/genproto/googleapis/rpc/status"

"go.opentelemetry.io/collector/model/otlp"
"go.opentelemetry.io/collector/model/otlpgrpc"
)

Expand All @@ -31,18 +30,8 @@ const (
)

var (
pbEncoder = &protoEncoder{}
jsEncoder = &jsonEncoder{}

tracesPbUnmarshaler = otlp.NewProtobufTracesUnmarshaler()
tracesJSONUnmarshaler = otlp.NewJSONTracesUnmarshaler()

metricsPbUnmarshaler = otlp.NewProtobufMetricsUnmarshaler()
metricsJSONUnmarshaler = otlp.NewJSONMetricsUnmarshaler()

logsPbUnmarshaler = otlp.NewProtobufLogsUnmarshaler()
logsJSONUnmarshaler = otlp.NewJSONLogsUnmarshaler()

pbEncoder = &protoEncoder{}
jsEncoder = &jsonEncoder{}
jsonMarshaler = &jsonpb.Marshaler{}
)

Expand All @@ -63,33 +52,15 @@ type encoder interface {
type protoEncoder struct{}

func (protoEncoder) unmarshalTracesRequest(buf []byte) (otlpgrpc.TracesRequest, error) {
td, err := tracesPbUnmarshaler.UnmarshalTraces(buf)
if err != nil {
return otlpgrpc.TracesRequest{}, err
}
req := otlpgrpc.NewTracesRequest()
req.SetTraces(td)
return req, nil
return otlpgrpc.UnmarshalTracesRequest(buf)
}

func (protoEncoder) unmarshalMetricsRequest(buf []byte) (otlpgrpc.MetricsRequest, error) {
td, err := metricsPbUnmarshaler.UnmarshalMetrics(buf)
if err != nil {
return otlpgrpc.MetricsRequest{}, err
}
req := otlpgrpc.NewMetricsRequest()
req.SetMetrics(td)
return req, nil
return otlpgrpc.UnmarshalMetricsRequest(buf)
}

func (protoEncoder) unmarshalLogsRequest(buf []byte) (otlpgrpc.LogsRequest, error) {
ld, err := logsPbUnmarshaler.UnmarshalLogs(buf)
if err != nil {
return otlpgrpc.LogsRequest{}, err
}
req := otlpgrpc.NewLogsRequest()
req.SetLogs(ld)
return req, nil
return otlpgrpc.UnmarshalLogsRequest(buf)
}

func (protoEncoder) marshalTracesResponse(resp otlpgrpc.TracesResponse) ([]byte, error) {
Expand All @@ -115,33 +86,15 @@ func (protoEncoder) contentType() string {
type jsonEncoder struct{}

func (jsonEncoder) unmarshalTracesRequest(buf []byte) (otlpgrpc.TracesRequest, error) {
td, err := tracesJSONUnmarshaler.UnmarshalTraces(buf)
if err != nil {
return otlpgrpc.TracesRequest{}, err
}
req := otlpgrpc.NewTracesRequest()
req.SetTraces(td)
return req, nil
return otlpgrpc.UnmarshalJSONTracesRequest(buf)
}

func (jsonEncoder) unmarshalMetricsRequest(buf []byte) (otlpgrpc.MetricsRequest, error) {
td, err := metricsJSONUnmarshaler.UnmarshalMetrics(buf)
if err != nil {
return otlpgrpc.MetricsRequest{}, err
}
req := otlpgrpc.NewMetricsRequest()
req.SetMetrics(td)
return req, nil
return otlpgrpc.UnmarshalJSONMetricsRequest(buf)
}

func (jsonEncoder) unmarshalLogsRequest(buf []byte) (otlpgrpc.LogsRequest, error) {
ld, err := logsJSONUnmarshaler.UnmarshalLogs(buf)
if err != nil {
return otlpgrpc.LogsRequest{}, err
}
req := otlpgrpc.NewLogsRequest()
req.SetLogs(ld)
return req, nil
return otlpgrpc.UnmarshalJSONLogsRequest(buf)
}

func (jsonEncoder) marshalTracesResponse(resp otlpgrpc.TracesResponse) ([]byte, error) {
Expand Down
23 changes: 8 additions & 15 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"testing"
"time"

"github.com/gogo/protobuf/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
spb "google.golang.org/genproto/googleapis/rpc/status"
Expand Down Expand Up @@ -179,13 +178,12 @@ func testHTTPJSONRequest(t *testing.T, url string, sink *internalconsumertest.Er

client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err, "Error posting trace to grpc-gateway server: %v", err)
require.NoError(t, err, "Error posting trace to http server: %v", err)

respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("Error reading response from trace grpc-gateway, %v", err)
t.Errorf("Error reading response from trace http server, %v", err)
}
respStr := string(respBytes)
err = resp.Body.Close()
if err != nil {
t.Errorf("Error closing response body, %v", err)
Expand All @@ -194,17 +192,14 @@ func testHTTPJSONRequest(t *testing.T, url string, sink *internalconsumertest.Er
allTraces := sink.AllTraces()
if expectedErr == nil {
assert.Equal(t, 200, resp.StatusCode)
var respJSON map[string]interface{}
assert.NoError(t, json.Unmarshal([]byte(respStr), &respJSON))
assert.Len(t, respJSON, 0, "Got unexpected response from trace grpc-gateway")
_, err = otlpgrpc.UnmarshalJSONTracesResponse(respBytes)
assert.NoError(t, err, "Unable to unmarshal response to TracesResponse")

require.Len(t, allTraces, 1)

got := allTraces[0]
assert.EqualValues(t, got, traceOtlp)
assert.EqualValues(t, allTraces[0], traceOtlp)
} else {
errStatus := &spb.Status{}
assert.NoError(t, json.Unmarshal([]byte(respStr), errStatus))
assert.NoError(t, json.Unmarshal(respBytes, errStatus))
if s, ok := status.FromError(expectedErr); ok {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.True(t, proto.Equal(errStatus, s.Proto()))
Expand Down Expand Up @@ -326,10 +321,8 @@ func testHTTPProtobufRequest(

if expectedErr == nil {
require.Equal(t, 200, resp.StatusCode, "Unexpected return status")
// TODO: Parse otlp response here instead of empty proto when pdata allows that.
tmp := &types.Empty{}
err := tmp.Unmarshal(respBytes)
require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto")
_, err = otlpgrpc.UnmarshalTracesResponse(respBytes)
require.NoError(t, err, "Unable to unmarshal response to TracesResponse")

require.Len(t, allTraces, 1)
assert.EqualValues(t, allTraces[0], wantData)
Expand Down