forked from agoda-com/opentelemetry-logs-go
/
otlptest.go
118 lines (104 loc) · 3.34 KB
/
otlptest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package otlplogstest
import (
"context"
"testing"
"time"
"github.com/landmaj/opentelemetry-logs-go/exporters/otlp/otlplogs"
"github.com/landmaj/opentelemetry-logs-go/logs"
sdklogs "github.com/landmaj/opentelemetry-logs-go/sdk/logs"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource"
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
)
// RunEndToEndTest can be used by otlplogs.Client tests to validate
// themselves.
func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlplogs.Exporter, logsCollector LogsCollector) {
pOpts := []sdklogs.LoggerProviderOption{
sdklogs.WithBatcher(
exp,
// add following two options to ensure flush
sdklogs.WithBatchTimeout(5*time.Second),
sdklogs.WithMaxExportBatchSize(10),
),
}
tp1 := sdklogs.NewLoggerProvider(append(pOpts,
sdklogs.WithResource(resource.NewSchemaless(
attribute.String("rk1", "rv11)"),
attribute.Int64("rk2", 5),
)))...)
tp2 := sdklogs.NewLoggerProvider(append(pOpts,
sdklogs.WithResource(resource.NewSchemaless(
attribute.String("rk1", "rv12)"),
attribute.Float64("rk3", 6.5),
)))...)
tr1 := tp1.Logger("test-logger1")
tr2 := tp2.Logger("test-logger2")
// Now create few logs
m := 4
body := "TestLog"
for i := 0; i < m; i++ {
lr1 := logs.NewLogRecord(logs.LogRecordConfig{
Body: &body,
Attributes: &[]attribute.KeyValue{attribute.Int64("i", int64(i))},
})
tr1.Emit(lr1)
lr2 := logs.NewLogRecord(logs.LogRecordConfig{
Body: &body,
Attributes: &[]attribute.KeyValue{attribute.Int64("i", int64(i))},
})
tr2.Emit(lr2)
}
func() {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := tp1.Shutdown(ctx); err != nil {
t.Fatalf("failed to shut down a logger provider 1: %v", err)
}
if err := tp2.Shutdown(ctx); err != nil {
t.Fatalf("failed to shut down a logger provider 2: %v", err)
}
}()
// Wait >2 cycles.
<-time.After(40 * time.Millisecond)
// Now shutdown the exporter
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf("failed to stop the exporter: %v", err)
}
// Shutdown the collector too so that we can begin
// verification checks of expected data back.
if err := logsCollector.Stop(); err != nil {
t.Fatalf("failed to stop the mock collector: %v", err)
}
// Now verify that we only got two resources
rss := logsCollector.GetResourceLogs()
if got, want := len(rss), 2; got != want {
t.Fatalf("resource log count: got %d, want %d\n", got, want)
}
// Now verify logs and attributes for each resource log.
for _, rs := range rss {
if len(rs.ScopeLogs) == 0 {
t.Fatalf("zero ScopeLogs")
}
if got, want := len(rs.ScopeLogs[0].LogRecords), m; got != want {
t.Fatalf("log counts: got %d, want %d", got, want)
}
attrMap := map[int64]bool{}
for _, s := range rs.ScopeLogs[0].LogRecords {
if gotName, want := s.Body.GetStringValue(), "TestLog"; gotName != want {
t.Fatalf("log name: got %s, want %s", gotName, want)
}
attrMap[s.Attributes[0].Value.Value.(*commonpb.AnyValue_IntValue).IntValue] = true
}
if got, want := len(attrMap), m; got != want {
t.Fatalf("log attribute unique values: got %d want %d", got, want)
}
for i := 0; i < m; i++ {
_, ok := attrMap[int64(i)]
if !ok {
t.Fatalf("log with attribute %d missing", i)
}
}
}
}