Skip to content

Commit

Permalink
Add promtail receiver (#18072)
Browse files Browse the repository at this point in the history
* [pkg/translator/loki] use loki/pkg/push dependency, remove logproto copy

* [receiver/promtail] bring back promtailreceiver
  • Loading branch information
mar4uk authored Jan 31, 2023
1 parent 3771eee commit 5df98e5
Show file tree
Hide file tree
Showing 28 changed files with 1,072 additions and 15,442 deletions.
16 changes: 16 additions & 0 deletions .chloggen/add-promtail-receiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: promtailreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds promtailreceiver to the `internal/components`

# One or more tracking issues related to the change
issues: [17082]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
68 changes: 64 additions & 4 deletions cmd/configschema/go.mod

Large diffs are not rendered by default.

396 changes: 388 additions & 8 deletions cmd/configschema/go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ require (
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/gosnmp/gosnmp v1.35.0 // indirect
github.com/grafana/loki/pkg/push v0.0.0-20230127072203-4e8cc8d71928 // indirect
github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd // indirect
github.com/grobie/gomemcache v0.0.0-20180201122607-1f779c573665 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions exporter/lokiexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.18
require (
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/grafana/loki/pkg/push v0.0.0-20230127072203-4e8cc8d71928
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.70.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.70.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki v0.70.0
Expand All @@ -22,14 +23,13 @@ require (

require (
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/knadh/koanf v1.5.0 // indirect
Expand All @@ -39,7 +39,6 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/prometheus v0.41.0 // indirect
github.com/rs/cors v1.8.3 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/featuregate v0.70.0 // indirect
Expand All @@ -48,6 +47,7 @@ require (
go.opentelemetry.io/otel/metric v0.34.0 // indirect
go.opentelemetry.io/otel/trace v1.12.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/goleak v1.2.0 // indirect
golang.org/x/net v0.5.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/text v0.6.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions exporter/lokiexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 13 additions & 13 deletions exporter/lokiexporter/legacy_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/grafana/loki/pkg/push"
"github.com/prometheus/common/model"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
Expand All @@ -41,7 +42,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter/internal/tenant"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki/logproto"
)

const (
Expand All @@ -53,7 +53,7 @@ type lokiExporter struct {
settings component.TelemetrySettings
client *http.Client
wg sync.WaitGroup
convert func(plog.LogRecord, pcommon.Resource, pcommon.InstrumentationScope) (*logproto.Entry, error)
convert func(plog.LogRecord, pcommon.Resource, pcommon.InstrumentationScope) (*push.Entry, error)
tenantSource tenant.Source
}

Expand Down Expand Up @@ -199,10 +199,10 @@ func (l *lokiExporter) stop(context.Context) (err error) {
return nil
}

func (l *lokiExporter) logDataToLoki(ld plog.Logs) (pr *logproto.PushRequest, numDroppedLogs int) {
func (l *lokiExporter) logDataToLoki(ld plog.Logs) (pr *push.PushRequest, numDroppedLogs int) {
var errs error

streams := make(map[string]*logproto.Stream)
streams := make(map[string]*push.Stream)
rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
ills := rls.At(i).ScopeLogs()
Expand All @@ -224,7 +224,7 @@ func (l *lokiExporter) logDataToLoki(ld plog.Logs) (pr *logproto.PushRequest, nu
mergedLabels = mergedLabels.Merge(recordLabels)

labels := mergedLabels.String()
var entry *logproto.Entry
var entry *push.Entry
var err error
entry, err = l.convert(log, resource, scope)
if err != nil {
Expand All @@ -248,9 +248,9 @@ func (l *lokiExporter) logDataToLoki(ld plog.Logs) (pr *logproto.PushRequest, nu
continue
}

streams[labels] = &logproto.Stream{
streams[labels] = &push.Stream{
Labels: labels,
Entries: []logproto.Entry{*entry},
Entries: []push.Entry{*entry},
}
}
}
Expand All @@ -260,8 +260,8 @@ func (l *lokiExporter) logDataToLoki(ld plog.Logs) (pr *logproto.PushRequest, nu
l.settings.Logger.Debug("some logs has been dropped", zap.Error(errs))
}

pr = &logproto.PushRequest{
Streams: make([]logproto.Stream, len(streams)),
pr = &push.PushRequest{
Streams: make([]push.Stream, len(streams)),
}

i := 0
Expand Down Expand Up @@ -327,7 +327,7 @@ func (l *lokiExporter) convertRecordAttributesToLabels(log plog.LogRecord) model
return ls
}

func (l *lokiExporter) convertLogBodyToEntry(lr plog.LogRecord, res pcommon.Resource, scope pcommon.InstrumentationScope) (*logproto.Entry, error) {
func (l *lokiExporter) convertLogBodyToEntry(lr plog.LogRecord, res pcommon.Resource, scope pcommon.InstrumentationScope) (*push.Entry, error) {
var b strings.Builder

if _, ok := l.config.Labels.RecordAttributes["severity"]; !ok && len(lr.SeverityText()) > 0 {
Expand Down Expand Up @@ -395,18 +395,18 @@ func (l *lokiExporter) convertLogBodyToEntry(lr plog.LogRecord, res pcommon.Reso

b.WriteString(lr.Body().Str())

return &logproto.Entry{
return &push.Entry{
Timestamp: timestampFromLogRecord(lr),
Line: b.String(),
}, nil
}

func (l *lokiExporter) convertLogToJSONEntry(lr plog.LogRecord, res pcommon.Resource, scope pcommon.InstrumentationScope) (*logproto.Entry, error) {
func (l *lokiExporter) convertLogToJSONEntry(lr plog.LogRecord, res pcommon.Resource, scope pcommon.InstrumentationScope) (*push.Entry, error) {
line, err := loki.Encode(lr, res, scope)
if err != nil {
return nil, err
}
return &logproto.Entry{
return &push.Entry{
Timestamp: timestampFromLogRecord(lr),
Line: line,
}, nil
Expand Down
20 changes: 10 additions & 10 deletions exporter/lokiexporter/legacy_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/golang/snappy"
"github.com/grafana/loki/pkg/push"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -39,7 +40,6 @@ import (
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter/internal/tenant"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki/logproto"
)

const (
Expand Down Expand Up @@ -376,7 +376,7 @@ func TestExporter_logDataToLoki(t *testing.T) {
lr.SetTimestamp(ts)

pr, numDroppedLogs := exp.logDataToLoki(logs)
expectedPr := &logproto.PushRequest{Streams: []logproto.Stream{}}
expectedPr := &push.PushRequest{Streams: []push.Stream{}}
require.Equal(t, 1, numDroppedLogs)
require.Equal(t, expectedPr, pr)
})
Expand Down Expand Up @@ -462,7 +462,7 @@ func TestExporter_logDataToLoki(t *testing.T) {
lri.SetTimestamp(ts)

pr, numDroppedLogs := exp.logDataToLoki(logs)
expectedPr := &logproto.PushRequest{Streams: []logproto.Stream{}}
expectedPr := &push.PushRequest{Streams: []push.Stream{}}
require.Equal(t, 1, numDroppedLogs)
require.Equal(t, expectedPr, pr)
})
Expand Down Expand Up @@ -591,7 +591,7 @@ func TestExporter_convertLogBodyToEntry(t *testing.T) {
}, componenttest.NewNopTelemetrySettings())
entry, _ := exp.convertLogBodyToEntry(lr, res, scope)

expEntry := &logproto.Entry{
expEntry := &push.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Line: "severity=DEBUG severityN=5 traceID=01020304000000000000000000000000 spanID=0506070800000000 host.name=something instrumentation_scope_name=example-logger-name instrumentation_scope_version=v1 Payment succeeded",
}
Expand All @@ -615,16 +615,16 @@ func TestExporter_encode(t *testing.T) {
labels := model.LabelSet{
model.LabelName("container_name"): model.LabelValue("mycontainer"),
}
entry := &logproto.Entry{
entry := &push.Entry{
Timestamp: time.Now(),
Line: "log message",
}
stream := logproto.Stream{
stream := push.Stream{
Labels: labels.String(),
Entries: []logproto.Entry{*entry},
Entries: []push.Entry{*entry},
}
pr := &logproto.PushRequest{
Streams: []logproto.Stream{stream},
pr := &push.PushRequest{
Streams: []push.Stream{stream},
}

req, err := encode(pr)
Expand Down Expand Up @@ -701,7 +701,7 @@ func TestExporter_convertLogtoJSONEntry(t *testing.T) {

exp := newLegacyExporter(&Config{}, componenttest.NewNopTelemetrySettings())
entry, err := exp.convertLogToJSONEntry(lr, res, scope)
expEntry := &logproto.Entry{
expEntry := &push.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Line: `{"body":"log message","resources":{"host.name":"something"},"instrumentation_scope":{"name":"example-logger-name","version":"v1"}}`,
}
Expand Down
9 changes: 4 additions & 5 deletions exporter/lokiexporter/next_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ import (

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/grafana/loki/pkg/push"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki/logproto"
)

func TestPushLogData(t *testing.T) {
Expand Down Expand Up @@ -70,7 +69,7 @@ func TestPushLogData(t *testing.T) {
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
actualPushRequest := &logproto.PushRequest{}
actualPushRequest := &push.PushRequest{}

// prepare
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -246,7 +245,7 @@ func TestLogsToLokiRequestWithGroupingByTenant(t *testing.T) {
}
for _, tC := range tests {
t.Run(tC.desc, func(t *testing.T) {
actualPushRequestPerTenant := map[string]*logproto.PushRequest{}
actualPushRequestPerTenant := map[string]*push.PushRequest{}

// prepare
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -256,7 +255,7 @@ func TestLogsToLokiRequestWithGroupingByTenant(t *testing.T) {
decPayload, err := snappy.Decode(nil, encPayload)
require.NoError(t, err)

pr := &logproto.PushRequest{}
pr := &push.PushRequest{}
err = proto.Unmarshal(decPayload, pr)
require.NoError(t, err)

Expand Down
Loading

0 comments on commit 5df98e5

Please sign in to comment.