Skip to content

Commit

Permalink
Allow stoting tags as object fields for better kibana support
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay committed Aug 29, 2018
1 parent 559d8ca commit 6bb7f85
Show file tree
Hide file tree
Showing 28 changed files with 1,299 additions and 224 deletions.
39 changes: 0 additions & 39 deletions model/converter/json/domain_span_compare_test.go

This file was deleted.

32 changes: 17 additions & 15 deletions model/json/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,20 @@ type Trace struct {
// When converting to ES model, ProcessID and Warnings should be omitted. Even if
// included, ES with dynamic settings off will automatically ignore unneeded fields.
type Span struct {
TraceID TraceID `json:"traceID"`
SpanID SpanID `json:"spanID"`
ParentSpanID SpanID `json:"parentSpanID,omitempty"` // deprecated
Flags uint32 `json:"flags,omitempty"`
OperationName string `json:"operationName"`
References []Reference `json:"references"`
StartTime uint64 `json:"startTime"` // microseconds since Unix epoch
Duration uint64 `json:"duration"` // microseconds
Tags []KeyValue `json:"tags"`
Logs []Log `json:"logs"`
ProcessID ProcessID `json:"processID,omitempty"`
Process *Process `json:"process,omitempty"`
Warnings []string `json:"warnings"`
TraceID TraceID `json:"traceID"`
SpanID SpanID `json:"spanID"`
ParentSpanID SpanID `json:"parentSpanID,omitempty"` // deprecated
Flags uint32 `json:"flags,omitempty"`
OperationName string `json:"operationName"`
References []Reference `json:"references"`
StartTime uint64 `json:"startTime"` // microseconds since Unix epoch
Duration uint64 `json:"duration"` // microseconds
Tags []KeyValue `json:"tags"`
TagsMap map[string]interface{} `json:"tagsMap,omitempty"`
Logs []Log `json:"logs"`
ProcessID ProcessID `json:"processID,omitempty"`
Process *Process `json:"process,omitempty"`
Warnings []string `json:"warnings"`
}

// Reference is a reference from one span to another
Expand All @@ -85,8 +86,9 @@ type Reference struct {

// Process is the process emitting a set of spans
type Process struct {
ServiceName string `json:"serviceName"`
Tags []KeyValue `json:"tags"`
ServiceName string `json:"serviceName"`
Tags []KeyValue `json:"tags"`
TagsMap map[string]interface{} `json:"tagsMap,omitempty"`
}

// Log is a log emitted in a span
Expand Down
14 changes: 14 additions & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Configuration struct {
BulkActions int
BulkFlushInterval time.Duration
IndexPrefix string
TagsFilePath string
AllTagsAsObject bool
}

// ClientBuilder creates new es.Client
Expand All @@ -51,6 +53,8 @@ type ClientBuilder interface {
GetNumReplicas() int64
GetMaxSpanAge() time.Duration
GetIndexPrefix() string
GetTagsFilePath() string
GetAllTagsAsObject() bool
}

// NewClient creates a new ElasticSearch client
Expand Down Expand Up @@ -165,6 +169,16 @@ func (c *Configuration) GetIndexPrefix() string {
return c.IndexPrefix
}

// GetTagsFilePath returns a path to file containing tag keys
func (c *Configuration) GetTagsFilePath() string {
return c.TagsFilePath
}

// GetAllTagsAsObject returns true if all tags should be stored as object fields
func (c *Configuration) GetAllTagsAsObject() bool {
return c.AllTagsAsObject
}

// GetConfigs wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) GetConfigs() []elastic.ClientOptionFunc {
options := make([]elastic.ClientOptionFunc, 3)
Expand Down
38 changes: 37 additions & 1 deletion plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
package es

import (
"bufio"
"flag"
"os"
"path/filepath"
"strings"

"github.com/spf13/viper"
"github.com/uber/jaeger-lib/metrics"
Expand Down Expand Up @@ -80,10 +84,42 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
cfg := f.primaryConfig
return esSpanStore.NewSpanWriter(f.primaryClient, f.logger, f.metricsFactory, cfg.GetNumShards(), cfg.GetNumReplicas(), cfg.GetIndexPrefix()), nil
var tags []string
if cfg.GetTagsFilePath() != "" {
var err error
if tags, err = loadTagsFromFile(cfg.GetTagsFilePath()); err != nil {
f.logger.Error("Could not open file with tags", zap.Error(err))
return nil, err
}
}
p := esSpanStore.SpanWriterParams{Client: f.primaryClient,
Logger: f.logger,
MetricsFactory: f.metricsFactory,
NumShards: f.primaryConfig.GetNumShards(),
NumReplicas: f.primaryConfig.GetNumReplicas(),
IndexPrefix: f.primaryConfig.GetIndexPrefix(), AllTags: f.primaryConfig.GetAllTagsAsObject(), ObjectTags: tags}
return esSpanStore.NewSpanWriter(p), nil
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix()), nil
}

func loadTagsFromFile(filePath string) ([]string, error) {
file, err := os.Open(filepath.Clean(filePath))
if err != nil {
return nil, err
}
defer file.Close()

scanner := bufio.NewScanner(file)
var tags []string
for scanner.Scan() {
line := scanner.Text()
if tag := strings.TrimSpace(line); tag != "" {
tags = append(tags, tag)
}
}
return tags, nil
}
43 changes: 43 additions & 0 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

Expand Down Expand Up @@ -66,3 +67,45 @@ func TestElasticsearchFactory(t *testing.T) {
_, err = f.CreateDependencyReader()
assert.NoError(t, err)
}

func TestElasticsearchTagsFileDoNotExist(t *testing.T) {
f := NewFactory()
mockConf := &mockClientBuilder{}
mockConf.TagsFilePath = "fixtures/tags_foo.txt"
f.primaryConfig = mockConf
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
r, err := f.CreateSpanWriter()
require.Error(t, err)
assert.Nil(t, r)
}

func TestLoadTagsFromFile(t *testing.T) {
tests := []struct {
path string
tags []string
error bool
}{
{
path: "fixtures/do_not_exists.txt",
error: true,
},
{
path: "fixtures/tags_01.txt",
tags: []string{"foo", "bar", "space"},
},
{
path: "fixtures/tags_02.txt",
tags: nil,
},
}

for _, test := range tests {
tags, err := loadTagsFromFile(test.path)
if test.error {
require.Error(t, err)
assert.Nil(t, tags)
} else {
assert.Equal(t, test.tags, tags)
}
}
}
3 changes: 3 additions & 0 deletions plugin/storage/es/fixtures/tags_01.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
foo
bar
space
4 changes: 4 additions & 0 deletions plugin/storage/es/fixtures/tags_02.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@




13 changes: 13 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
suffixBulkActions = ".bulk.actions"
suffixBulkFlushInterval = ".bulk.flush-interval"
suffixIndexPrefix = ".index-prefix"
suffixTagsFile = ".tags-file-path"
suffixAllTagsAsObject = ".allTagsAsObject"
)

// TODO this should be moved next to config.Configuration struct (maybe ./flags package)
Expand Down Expand Up @@ -146,6 +148,15 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixIndexPrefix,
nsConfig.IndexPrefix,
"Optional prefix of Jaeger indices. For example \"production\" creates \"production:jaeger-*\".")
flagSet.String(
nsConfig.namespace+suffixTagsFile,
nsConfig.TagsFilePath,
"Path to a file containing tag keys which will be stored as object fields. Each key should be on a separate line. If the file contains only \"*\""+
" it will store all tags as object fields.")
flagSet.Bool(
nsConfig.namespace+suffixAllTagsAsObject,
nsConfig.AllTagsAsObject,
"Store all span and process tags as object fields. If true "+suffixTagsFile+" is ignored.")
}

// InitFromViper initializes Options with properties from viper
Expand All @@ -169,6 +180,8 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions)
cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval)
cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix)
cfg.TagsFilePath = v.GetString(cfg.namespace + suffixTagsFile)
cfg.AllTagsAsObject = v.GetBool(cfg.namespace + suffixAllTagsAsObject)
}

// GetPrimary returns primary configuration.
Expand Down
89 changes: 89 additions & 0 deletions plugin/storage/es/spanstore/dbmodel/fixtures/domain_01.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
{
"traceId": "AAAAAAAAAAAAAAAAAAAAAQ==",
"spanId": "AAAAAAAAAAI=",
"operationName": "test-general-conversion",
"references": [
{
"refType": "CHILD_OF",
"traceId": "AAAAAAAAAAAAAAAAAAAAAQ==",
"spanId": "AAAAAAAAAAM="
},
{
"refType": "FOLLOWS_FROM",
"traceId": "AAAAAAAAAAAAAAAAAAAAAQ==",
"spanId": "AAAAAAAAAAQ="
},
{
"refType": "CHILD_OF",
"traceId": "AAAAAAAAAAAAAAAAAAAA/w==",
"spanId": "AAAAAAAAAP8="
}
],
"flags": 1,
"startTime": "2017-01-26T16:46:31.639875-05:00",
"duration": "5000ns",
"tags": [
{
"key": "peer.service",
"vType": "STRING",
"vStr": "service-y"
},
{
"key": "peer.ipv4",
"vType": "INT64",
"vInt64": 23456
},
{
"key": "error",
"vType": "BOOL",
"vBool": true
},
{
"key": "temperature",
"vType": "FLOAT64",
"vFloat64": 72.5
},
{
"key": "blob",
"vType": "BINARY",
"vBinary": "AAAwOQ=="
}
],
"logs": [
{
"timestamp": "2017-01-26T16:46:31.639875-05:00",
"fields": [
{
"key": "event",
"vType": "INT64",
"vInt64": 123415
}
]
},
{
"timestamp": "2017-01-26T16:46:31.639875-05:00",
"fields": [
{
"key": "x",
"vType": "STRING",
"vStr": "y"
}
]
}
],
"process": {
"serviceName": "service-x",
"tags": [
{
"key": "peer.ipv4",
"vType": "INT64",
"vInt64": 23456
},
{
"key": "error",
"vType": "BOOL",
"vBool": true
}
]
}
}
Loading

0 comments on commit 6bb7f85

Please sign in to comment.