-
Notifications
You must be signed in to change notification settings - Fork 101
/
parse_report.go
259 lines (238 loc) · 7.58 KB
/
parse_report.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
package rillv1
import (
"encoding/json"
"errors"
"fmt"
"net/mail"
"strings"
"time"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/drivers/slack"
"github.com/rilldata/rill/runtime/pkg/pbutil"
"google.golang.org/protobuf/types/known/structpb"
)
// ReportYAML is the raw structure of a Report resource defined in YAML (does not include common fields)
type ReportYAML struct {
commonYAML `yaml:",inline"` // Not accessed here, only setting it so we can use KnownFields for YAML parsing
Title string `yaml:"title"`
Refresh *ScheduleYAML `yaml:"refresh"`
Watermark string `yaml:"watermark"` // options: "trigger_time", "inherit"
Intervals struct {
Duration string `yaml:"duration"`
Limit uint `yaml:"limit"`
CheckUnclosed bool `yaml:"check_unclosed"`
} `yaml:"intervals"`
Timeout string `yaml:"timeout"`
Query struct {
Name string `yaml:"name"`
Args map[string]any `yaml:"args"`
ArgsJSON string `yaml:"args_json"`
} `yaml:"query"`
Export struct {
Format string `yaml:"format"`
Limit uint `yaml:"limit"`
} `yaml:"export"`
Email struct {
Recipients []string `yaml:"recipients"`
} `yaml:"email"`
Notify struct {
Email struct {
Recipients []string `yaml:"recipients"`
} `yaml:"email"`
Slack struct {
Users []string `yaml:"users"`
Channels []string `yaml:"channels"`
Webhooks []string `yaml:"webhooks"`
} `yaml:"slack"`
} `yaml:"notify"`
Annotations map[string]string `yaml:"annotations"`
}
// parseReport parses a report definition and adds the resulting resource to p.Resources.
func (p *Parser) parseReport(node *Node) error {
// Parse YAML
tmp := &ReportYAML{}
err := p.decodeNodeYAML(node, true, tmp)
if err != nil {
return err
}
// Validate SQL or connector isn't set
if node.SQL != "" {
return fmt.Errorf("reports cannot have SQL")
}
if !node.ConnectorInferred && node.Connector != "" {
return fmt.Errorf("reports cannot have a connector")
}
// Parse refresh schedule
schedule, err := parseScheduleYAML(tmp.Refresh)
if err != nil {
return err
}
// Parse watermark
watermarkInherit := false
if tmp.Watermark != "" {
switch strings.ToLower(tmp.Watermark) {
case "inherit":
watermarkInherit = true
case "trigger_time":
// Do nothing
default:
return fmt.Errorf(`invalid value %q for property "watermark"`, tmp.Watermark)
}
}
// Validate the interval duration as a standard ISO8601 duration (without Rill extensions) with only one component
if tmp.Intervals.Duration != "" {
err := validateISO8601(tmp.Intervals.Duration, true, true)
if err != nil {
return fmt.Errorf(`invalid value %q for property "intervals.duration"`, tmp.Intervals.Duration)
}
}
// Parse timeout
var timeout time.Duration
if tmp.Timeout != "" {
timeout, err = parseDuration(tmp.Timeout)
if err != nil {
return err
}
}
// Query name
if tmp.Query.Name == "" {
return fmt.Errorf(`invalid value %q for property "query.name"`, tmp.Query.Name)
}
// Query args
if tmp.Query.ArgsJSON != "" {
// Validate JSON
if !json.Valid([]byte(tmp.Query.ArgsJSON)) {
return errors.New(`failed to parse "query.args_json" as JSON`)
}
} else {
// Fall back to query.args if query.args_json is not set
data, err := json.Marshal(tmp.Query.Args)
if err != nil {
return fmt.Errorf(`failed to serialize "query.args" to JSON: %w`, err)
}
tmp.Query.ArgsJSON = string(data)
}
if tmp.Query.ArgsJSON == "" {
return errors.New(`missing query args (must set either "query.args" or "query.args_json")`)
}
// Parse export format
exportFormat, err := parseExportFormat(tmp.Export.Format)
if err != nil {
return err
}
if exportFormat == runtimev1.ExportFormat_EXPORT_FORMAT_UNSPECIFIED {
return fmt.Errorf(`missing required property "export.format"`)
}
if len(tmp.Email.Recipients) > 0 && len(tmp.Notify.Email.Recipients) > 0 {
return errors.New(`cannot set both "email.recipients" and "notify.email.recipients"`)
}
isLegacySyntax := len(tmp.Email.Recipients) > 0
// Validate recipients
if isLegacySyntax {
// Backward compatibility
for _, email := range tmp.Email.Recipients {
_, err := mail.ParseAddress(email)
if err != nil {
return fmt.Errorf("invalid recipient email address %q", email)
}
}
} else {
if len(tmp.Notify.Email.Recipients) == 0 && len(tmp.Notify.Slack.Channels) == 0 &&
len(tmp.Notify.Slack.Users) == 0 && len(tmp.Notify.Slack.Webhooks) == 0 {
return fmt.Errorf(`missing notification recipients`)
}
for _, email := range tmp.Notify.Email.Recipients {
_, err := mail.ParseAddress(email)
if err != nil {
return fmt.Errorf("invalid recipient email address %q", email)
}
}
for _, email := range tmp.Notify.Slack.Users {
_, err := mail.ParseAddress(email)
if err != nil {
return fmt.Errorf("invalid recipient email address %q", email)
}
}
}
// Track report
r, err := p.insertResource(ResourceKindReport, node.Name, node.Paths, node.Refs...)
if err != nil {
return err
}
// NOTE: After calling insertResource, an error must not be returned. Any validation should be done before calling it.
r.ReportSpec.Title = tmp.Title
if schedule != nil {
r.ReportSpec.RefreshSchedule = schedule
}
r.ReportSpec.WatermarkInherit = watermarkInherit
r.ReportSpec.IntervalsIsoDuration = tmp.Intervals.Duration
r.ReportSpec.IntervalsLimit = int32(tmp.Intervals.Limit)
r.ReportSpec.IntervalsCheckUnclosed = tmp.Intervals.CheckUnclosed
if timeout != 0 {
r.ReportSpec.TimeoutSeconds = uint32(timeout.Seconds())
}
r.ReportSpec.QueryName = tmp.Query.Name
r.ReportSpec.QueryArgsJson = tmp.Query.ArgsJSON
r.ReportSpec.ExportLimit = uint64(tmp.Export.Limit)
r.ReportSpec.ExportFormat = exportFormat
if isLegacySyntax {
// Backwards compatibility
// Email settings
notifier, err := structpb.NewStruct(map[string]any{
"recipients": pbutil.ToSliceAny(tmp.Email.Recipients),
})
if err != nil {
return fmt.Errorf("encountered invalid property type: %w", err)
}
r.ReportSpec.Notifiers = []*runtimev1.Notifier{
{
Connector: "email",
Properties: notifier,
},
}
} else {
// Email settings
if len(tmp.Notify.Email.Recipients) > 0 {
props, err := structpb.NewStruct(map[string]any{
"recipients": pbutil.ToSliceAny(tmp.Notify.Email.Recipients),
})
if err != nil {
return fmt.Errorf("encountered invalid property type: %w", err)
}
r.ReportSpec.Notifiers = append(r.ReportSpec.Notifiers, &runtimev1.Notifier{
Connector: "email",
Properties: props,
})
}
// Slack settings
if len(tmp.Notify.Slack.Channels) > 0 || len(tmp.Notify.Slack.Users) > 0 || len(tmp.Notify.Slack.Webhooks) > 0 {
props, err := structpb.NewStruct(slack.EncodeProps(tmp.Notify.Slack.Users, tmp.Notify.Slack.Channels, tmp.Notify.Slack.Webhooks))
if err != nil {
return err
}
r.ReportSpec.Notifiers = append(r.ReportSpec.Notifiers, &runtimev1.Notifier{
Connector: "slack",
Properties: props,
})
}
}
r.ReportSpec.Annotations = tmp.Annotations
return nil
}
func parseExportFormat(s string) (runtimev1.ExportFormat, error) {
switch strings.ToLower(s) {
case "":
return runtimev1.ExportFormat_EXPORT_FORMAT_UNSPECIFIED, nil
case "csv":
return runtimev1.ExportFormat_EXPORT_FORMAT_CSV, nil
case "xlsx":
return runtimev1.ExportFormat_EXPORT_FORMAT_XLSX, nil
case "parquet":
return runtimev1.ExportFormat_EXPORT_FORMAT_PARQUET, nil
default:
if val, ok := runtimev1.ExportFormat_value[s]; ok {
return runtimev1.ExportFormat(val), nil
}
return runtimev1.ExportFormat_EXPORT_FORMAT_UNSPECIFIED, fmt.Errorf("invalid export format %q", s)
}
}