-
Notifications
You must be signed in to change notification settings - Fork 567
/
json.go
154 lines (143 loc) · 3 KB
/
json.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package sdata
import (
"bufio"
"database/sql"
"encoding/json"
"io"
"time"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
)
// JSONWriter writes tuples as newline separated json objects.
type JSONWriter struct {
bufw *bufio.Writer
enc *json.Encoder
fields []string
record map[string]interface{}
}
// TODO: figure out some way to specify a projection so that we can write nested structures.
func NewJSONWriter(w io.Writer, fieldNames []string) *JSONWriter {
bufw := bufio.NewWriter(w)
enc := json.NewEncoder(bufw)
return &JSONWriter{
bufw: bufw,
enc: enc,
fields: fieldNames,
record: make(map[string]interface{}, len(fieldNames)),
}
}
func (m *JSONWriter) WriteTuple(row Tuple) error {
if len(row) != len(m.fields) {
return ErrTupleFields{Writer: m, Fields: m.fields, Tuple: row}
}
record := m.record
for i := range row {
var y interface{}
switch x := row[i].(type) {
case *sql.NullBool:
if x.Valid {
y = x.Bool
} else {
y = nil
}
case *sql.NullByte:
if x.Valid {
y = x.Byte
} else {
y = nil
}
case *sql.NullInt16:
if x.Valid {
y = x.Int16
} else {
y = nil
}
case *sql.NullInt32:
if x.Valid {
y = x.Int32
} else {
y = nil
}
case *sql.NullInt64:
if x.Valid {
y = x.Int64
} else {
y = nil
}
case *sql.NullFloat64:
if x.Valid {
y = x.Float64
} else {
y = nil
}
case *sql.NullString:
if x.Valid {
y = x.String
} else {
y = nil
}
case *sql.NullTime:
if x.Valid {
y = formatTimestampNTZ(x.Time.Format(time.RFC3339Nano))
} else {
y = nil
}
case *time.Time:
y = formatTimestampNTZ(x.Format(time.RFC3339Nano))
case *sql.RawBytes:
y = string(*x)
default:
y = row[i]
}
record[m.fields[i]] = y
}
return errors.EnsureStack(m.enc.Encode(record))
}
func (m *JSONWriter) Flush() error {
return errors.EnsureStack(m.bufw.Flush())
}
type JSONParser struct {
dec *json.Decoder
fieldNames []string
m map[string]interface{}
}
func NewJSONParser(r io.Reader, fieldNames []string) TupleReader {
dec := json.NewDecoder(r)
// UseNumber() is necessary to correctly parse large int64s, we have to first parse them
// as json.Numbers and then handle those in convert. Otherwise they are parsed as float64s
// and precision is lost for values above ~2^53.
dec.UseNumber()
return &JSONParser{
dec: dec,
fieldNames: fieldNames,
}
}
func (p *JSONParser) Next(row Tuple) error {
if len(row) != len(p.fieldNames) {
return ErrTupleFields{Fields: p.fieldNames, Tuple: row}
}
m := p.getMap()
if err := p.dec.Decode(&m); err != nil {
return errors.EnsureStack(err)
}
for i := range row {
colName := p.fieldNames[i]
v, exists := m[colName]
if !exists {
row[i] = nil
continue
}
if err := convert(row[i], v); err != nil {
return err
}
}
return nil
}
func (p *JSONParser) getMap() map[string]interface{} {
if p.m == nil {
p.m = make(map[string]interface{})
}
for k := range p.m {
delete(p.m, k)
}
return p.m
}