-
Notifications
You must be signed in to change notification settings - Fork 66
/
jsonlines.go
80 lines (67 loc) · 2.45 KB
/
jsonlines.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// Copyright (c) 2018 Yandex LLC. All rights reserved.
// Use of this source code is governed by a MPL 2.0
// license that can be found in the LICENSE file.
// Author: Vladimir Skipor <skipor@yandex-team.ru>
package aggregator
import (
"bufio"
"io"
"github.com/yandex/pandora/lib/ioutil2"
jsoniter "github.com/json-iterator/go"
"github.com/yandex/pandora/core"
"github.com/yandex/pandora/core/config"
"github.com/yandex/pandora/core/coreutil"
)
type JSONLineAggregatorConfig struct {
EncoderAggregatorConfig `config:",squash"`
JSONLineEncoderConfig `config:",squash"`
}
type JSONLineEncoderConfig struct {
JSONIterConfig `config:",squash"`
coreutil.BufferSizeConfig `config:",squash"`
}
// JSONIterConfig is subset of jsoniter.Config that may be useful to configure.
type JSONIterConfig struct {
// MarshalFloatWith6Digits makes float marshalling faster.
MarshalFloatWith6Digits bool `config:"marshal-float-with-6-digits"`
// SortMapKeys useful, when sample contains map object, and you want to see them in same order.
SortMapKeys bool `config:"sort-map-keys"`
}
func DefaultJSONLinesAggregatorConfig() JSONLineAggregatorConfig {
return JSONLineAggregatorConfig{
EncoderAggregatorConfig: DefaultEncoderAggregatorConfig(),
}
}
// Aggregates samples in JSON Lines format: each output line is a Valid JSON Value of one sample.
// See http://jsonlines.org/ for details.
func NewJSONLinesAggregator(conf JSONLineAggregatorConfig) core.Aggregator {
var newEncoder NewSampleEncoder = func(w io.Writer, onFlush func()) SampleEncoder {
w = ioutil2.NewCallbackWriter(w, onFlush)
return NewJSONEncoder(w, conf.JSONLineEncoderConfig)
}
return NewEncoderAggregator(newEncoder, conf.EncoderAggregatorConfig)
}
func NewJSONEncoder(w io.Writer, conf JSONLineEncoderConfig) SampleEncoder {
var apiConfig jsoniter.Config
config.Map(&apiConfig, conf.JSONIterConfig)
api := apiConfig.Froze()
// NOTE(skipor): internal buffering is not working really. Don't know why
// OPTIMIZE(skipor): don't wrap into buffer, if already ioutil2.ByteWriter
buf := bufio.NewWriterSize(w, conf.BufferSizeOrDefault())
stream := jsoniter.NewStream(api, buf, conf.BufferSizeOrDefault())
return &jsonEncoder{stream, buf}
}
type jsonEncoder struct {
*jsoniter.Stream
buf *bufio.Writer
}
func (e *jsonEncoder) Encode(s core.Sample) error {
e.WriteVal(s)
e.WriteRaw("\n")
return e.Error
}
func (e *jsonEncoder) Flush() error {
err := e.Stream.Flush()
_ = e.buf.Flush()
return err
}