-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
task.jsonparse.go
107 lines (91 loc) · 2.73 KB
/
task.jsonparse.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package pipeline
import (
"context"
"database/sql/driver"
"encoding/json"
"math/big"
"strings"
"github.com/pkg/errors"
)
type JSONParseTask struct {
BaseTask `mapstructure:",squash"`
Path JSONPath `json:"path"`
// Lax when disabled will return an error if the path does not exist
// Lax when enabled will return nil with no error if the path does not exist
Lax bool
}
var _ Task = (*JSONParseTask)(nil)
func (t *JSONParseTask) Type() TaskType {
return TaskTypeJSONParse
}
func (t *JSONParseTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error {
return nil
}
func (t *JSONParseTask) Run(_ context.Context, taskRun TaskRun, inputs []Result) (result Result) {
if len(inputs) != 1 {
return Result{Error: errors.Wrapf(ErrWrongInputCardinality, "JSONParseTask requires a single input")}
} else if inputs[0].Error != nil {
return Result{Error: inputs[0].Error}
}
var bs []byte
switch v := inputs[0].Value.(type) {
case []byte:
bs = v
case string:
bs = []byte(v)
default:
return Result{Error: errors.Errorf("JSONParseTask does not accept inputs of type %T", inputs[0].Value)}
}
var decoded interface{}
err := json.Unmarshal(bs, &decoded)
if err != nil {
return Result{Error: err}
}
for _, part := range t.Path {
switch d := decoded.(type) {
case map[string]interface{}:
var exists bool
decoded, exists = d[part]
if !exists && t.Lax {
return Result{Value: nil}
} else if !exists {
return Result{Error: errors.Errorf(`could not resolve path ["%v"] in %s`, strings.Join(t.Path, `","`), bs)}
}
case []interface{}:
bigindex, ok := big.NewInt(0).SetString(part, 10)
if !ok {
return Result{Error: errors.Errorf("JSONParse task error: %v is not a valid array index", part)}
} else if !bigindex.IsInt64() {
if t.Lax {
return Result{Value: nil}
}
return Result{Error: errors.Errorf(`could not resolve path ["%v"] in %s`, strings.Join(t.Path, `","`), bs)}
}
index := int(bigindex.Int64())
if index < 0 {
index = len(d) + index
}
exists := index >= 0 && index < len(d)
if !exists && t.Lax {
return Result{Value: nil}
} else if !exists {
return Result{Error: errors.Errorf(`could not resolve path ["%v"] in %s`, strings.Join(t.Path, `","`), bs)}
}
decoded = d[index]
default:
return Result{Error: errors.Errorf(`could not resolve path ["%v"] in %s`, strings.Join(t.Path, `","`), bs)}
}
}
return Result{Value: decoded}
}
type JSONPath []string
func (p *JSONPath) UnmarshalText(bs []byte) error {
*p = strings.Split(string(bs), ",")
return nil
}
func (p *JSONPath) Scan(value interface{}) error {
return json.Unmarshal(value.([]byte), p)
}
func (p JSONPath) Value() (driver.Value, error) {
return json.Marshal(p)
}