forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 1
/
util.go
152 lines (134 loc) · 3.11 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package message
import (
"bytes"
"encoding/json"
"errors"
"io"
"os"
"strconv"
"time"
)
var useNumber = true
func init() {
if os.Getenv("BENTHOS_USE_NUMBER") == "false" {
useNumber = false
}
}
//------------------------------------------------------------------------------
// GetAllBytes returns a 2D byte slice representing the raw byte content of the
// parts of a message.
func GetAllBytes(m Batch) [][]byte {
if len(m) == 0 {
return nil
}
parts := make([][]byte, len(m))
_ = m.Iter(func(i int, p *Part) error {
parts[i] = p.AsBytes()
return nil
})
return parts
}
//------------------------------------------------------------------------------
func decodeJSON(rawBytes []byte) (structured any, err error) {
dec := json.NewDecoder(bytes.NewReader(rawBytes))
if useNumber {
dec.UseNumber()
}
if err = dec.Decode(&structured); err != nil {
return
}
var dummy json.RawMessage
if err = dec.Decode(&dummy); errors.Is(err, io.EOF) {
err = nil
return
}
structured = nil
if err = dec.Decode(&dummy); err == nil || err == io.EOF {
err = errors.New("message contains multiple valid documents")
}
return
}
func encodeJSON(d any) (rawBytes []byte) {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
enc.SetEscapeHTML(false)
if err := enc.Encode(d); err != nil {
return nil
}
if buf.Len() > 1 {
rawBytes = buf.Bytes()[:buf.Len()-1]
}
return
}
// Copy of query.IToString
func metaToString(i any) string {
switch t := i.(type) {
case string:
return t
case []byte:
return string(t)
case int64:
return strconv.FormatInt(t, 10)
case uint64:
return strconv.FormatUint(t, 10)
case float64:
return strconv.FormatFloat(t, 'g', -1, 64)
case json.Number:
return t.String()
case bool:
if t {
return "true"
}
return "false"
case time.Time:
return t.Format(time.RFC3339Nano)
case nil:
return `null`
}
// Last resort
return string(encodeJSON(i))
}
//------------------------------------------------------------------------------
func cloneMap(oldMap map[string]any) map[string]any {
newMap := make(map[string]any, len(oldMap))
for k, v := range oldMap {
newMap[k] = cloneGeneric(v)
}
return newMap
}
func cloneCheekyMap(oldMap map[any]any) map[any]any {
newMap := make(map[any]any, len(oldMap))
for k, v := range oldMap {
newMap[k] = cloneGeneric(v)
}
return newMap
}
func cloneSlice(oldSlice []any) []any {
newSlice := make([]any, len(oldSlice))
for i, v := range oldSlice {
newSlice[i] = cloneGeneric(v)
}
return newSlice
}
// cloneGeneric is a utility function that recursively copies a generic
// structure usually resulting from a JSON parse.
func cloneGeneric(root any) any {
switch t := root.(type) {
case map[string]any:
return cloneMap(t)
case map[any]any:
return cloneCheekyMap(t)
case []any:
return cloneSlice(t)
default:
// Oops, this means we have 'dirty' types within the object, we pass
// these through uncloned and hope that the author knows what they're
// doing.
return root
}
}
// CopyJSON recursively creates a deep copy of a JSON structure extracted from a
// message part.
func CopyJSON(root any) any {
return cloneGeneric(root)
}