-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
217 lines (185 loc) · 5.76 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package main
import (
"encoding/json"
"flag"
"fmt"
"io"
"os"
"time"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
)
// Version information
const (
Version = "0.1.0"
)
// PrometheusRecord is the ndjson-encoded format used for transporting metrics through firehose
type PrometheusRecord struct {
Body []byte `json:"b"`
}
// TimeSeries represents a decoded Prometheus time series in a more readable format
type TimeSeries struct {
Labels map[string]string `json:"labels"`
Timestamps []int64 `json:"timestamps"`
Values []float64 `json:"values"`
}
// WriteRequestJSON is a more readable representation of prompb.WriteRequest
type WriteRequestJSON struct {
Timeseries []TimeSeries `json:"timeseries"`
}
// decodePrompbWriteReq decodes the wrapped prompb.WriteRequest
func decodePrompbWriteReq(record *PrometheusRecord) (*prompb.WriteRequest, error) {
// Decompress the snappy-compressed data
data, err := snappy.Decode(nil, record.Body)
if err != nil {
return nil, fmt.Errorf("snappy decode error: %w", err)
}
// Unmarshal the protobuf message
var req prompb.WriteRequest
if err := proto.Unmarshal(data, &req); err != nil {
return nil, fmt.Errorf("protobuf unmarshal error: %w", err)
}
return &req, nil
}
// convertToReadableJSON converts a prompb.WriteRequest to a more readable JSON structure
func convertToReadableJSON(wreq *prompb.WriteRequest) *WriteRequestJSON {
result := &WriteRequestJSON{
Timeseries: make([]TimeSeries, len(wreq.Timeseries)),
}
for i, ts := range wreq.Timeseries {
// Convert labels
labels := make(map[string]string)
for _, label := range ts.Labels {
labels[label.Name] = label.Value
}
// Extract timestamps and values
timestamps := make([]int64, len(ts.Samples))
values := make([]float64, len(ts.Samples))
for j, sample := range ts.Samples {
timestamps[j] = sample.Timestamp
values[j] = sample.Value
}
result.Timeseries[i] = TimeSeries{
Labels: labels,
Timestamps: timestamps,
Values: values,
}
}
return result
}
// humanReadableTime converts a Prometheus timestamp to a human-readable format
func humanReadableTime(timestamp int64) string {
// Prometheus uses milliseconds since epoch
return time.Unix(timestamp/1000, (timestamp%1000)*1000000).Format(time.RFC3339Nano)
}
// streamingJSONDecoder reads and processes a stream of JSON objects without requiring them to be newline-delimited
type streamingJSONDecoder struct {
decoder *json.Decoder
count int
}
func newStreamingJSONDecoder(r io.Reader) *streamingJSONDecoder {
decoder := json.NewDecoder(r)
// Configure the decoder to support streams of concatenated JSON objects
decoder.UseNumber()
return &streamingJSONDecoder{
decoder: decoder,
count: 0,
}
}
func (s *streamingJSONDecoder) next() (*PrometheusRecord, error) {
var record PrometheusRecord
if err := s.decoder.Decode(&record); err != nil {
return nil, err
}
s.count++
return &record, nil
}
func main() {
inputFile := flag.String("input", "", "Input file containing PrometheusRecord entries (one per line)")
outputFile := flag.String("output", "", "Output file for JSON results (default: stdout)")
prettyPrint := flag.Bool("pretty", true, "Enable pretty-printing of JSON output")
humanTime := flag.Bool("human-time", false, "Show human-readable timestamps in output")
showVersion := flag.Bool("version", false, "Show version information and exit")
flag.Parse()
// Show version if requested
if *showVersion {
fmt.Printf("prometheus-decoder version %s\n", Version)
os.Exit(0)
}
if *inputFile == "" {
fmt.Println("Error: Input file is required")
flag.Usage()
os.Exit(1)
}
// Open input file
file, err := os.Open(*inputFile)
if err != nil {
fmt.Printf("Error opening input file: %v\n", err)
os.Exit(1)
}
defer file.Close()
// Prepare output
var output *os.File
if *outputFile == "" {
output = os.Stdout
} else {
output, err = os.Create(*outputFile)
if err != nil {
fmt.Printf("Error creating output file: %v\n", err)
os.Exit(1)
}
defer output.Close()
}
// Create JSON stream decoder
decoder := newStreamingJSONDecoder(file)
// Process each JSON object in the stream
for {
record, err := decoder.next()
if err == io.EOF {
break
}
if err != nil {
fmt.Printf("Error parsing JSON object #%d: %v\n", decoder.count, err)
continue
}
// Decode the PrometheusRecord
wreq, err := decodePrompbWriteReq(record)
if err != nil {
fmt.Printf("Error decoding JSON object #%d: %v\n", decoder.count, err)
continue
}
// Convert to our more readable format
jsonStruct := convertToReadableJSON(wreq)
// Apply human-readable time conversion if requested
if *humanTime {
for i := range jsonStruct.Timeseries {
humanTimes := make([]string, len(jsonStruct.Timeseries[i].Timestamps))
for j, ts := range jsonStruct.Timeseries[i].Timestamps {
humanTimes[j] = humanReadableTime(ts)
}
// We need to output this differently, so create a custom marshaling
// This would require a custom struct and marshaling approach
// For simplicity, we'll just add a note about it
fmt.Fprintf(output, "# Object %d: Human-readable timestamps for reference:\n", decoder.count)
for j, humanTime := range humanTimes {
fmt.Fprintf(output, "# Sample %d: %s\n", j, humanTime)
}
}
}
// Output the JSON
var jsonData []byte
if *prettyPrint {
jsonData, err = json.MarshalIndent(jsonStruct, "", " ")
} else {
jsonData, err = json.Marshal(jsonStruct)
}
if err != nil {
fmt.Printf("Error encoding JSON object #%d to JSON: %v\n", decoder.count, err)
continue
}
fmt.Fprintf(output, "# Object %d\n", decoder.count)
fmt.Fprintln(output, string(jsonData))
}
fmt.Printf("Successfully processed %d JSON objects\n", decoder.count)
}