/
import.go
341 lines (296 loc) · 9.3 KB
/
import.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
// Copyright 2021 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
package config
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"reflect"
"sort"
"strings"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
yaml "gopkg.in/yaml.v3"
)
type formattedError struct {
s string
}
func (fe *formattedError) Error() string {
return fe.s
}
// clusterConfig represents a redpanda configuration.
type clusterConfig map[string]any
// A custom unmarshal is needed because go-yaml parse "YYYY-MM-DD" as a full
// timestamp, writing YYYY-MM-DD HH:MM:SS +0000 UTC when encoding, so we are
// going to treat timestamps as strings.
// See: https://github.com/go-yaml/yaml/issues/770
func replaceTimestamp(n *yaml.Node) {
if len(n.Content) == 0 {
return
}
for _, innerNode := range n.Content {
if innerNode.Tag == "!!map" {
replaceTimestamp(innerNode)
}
if innerNode.Tag == "!!timestamp" {
innerNode.Tag = "!!str"
}
}
}
func (c *clusterConfig) UnmarshalYAML(n *yaml.Node) error {
replaceTimestamp(n)
var a map[string]any
err := n.Decode(&a)
if err != nil {
return err
}
*c = a
return nil
}
func importConfig(
ctx context.Context,
client *admin.AdminAPI,
filename string,
oldConfig admin.Config,
oldConfigFull admin.Config,
schema admin.ConfigSchema,
all bool,
) (err error) {
readbackBytes, err := os.ReadFile(filename)
if err != nil {
return fmt.Errorf("error reading file %s: %v", filename, err)
}
var readbackConfig clusterConfig
err = yaml.Unmarshal(readbackBytes, &readbackConfig)
if err != nil {
return fmt.Errorf("error parsing edited config: %v", err)
}
type propertyDelta struct {
Property string
OldValue string
NewValue string
}
var propertyDeltas []propertyDelta
// Calculate deltas
upsert := make(map[string]interface{})
remove := make([]string, 0)
for k, v := range readbackConfig {
oldVal, haveOldVal := oldConfig[k]
oldValMaterialized, haveOldValMaterialized := oldConfigFull[k]
if meta, ok := schema[k]; ok {
// For numeric types need special handling because
// yaml encoding will see '1' as an integer, even
// if it is given as the value for a floating point
// ('number') config property, and vice versa.
if meta.Type == "integer" {
if vFloat, ok := v.(float64); ok {
v = int(vFloat)
}
if oldVal != nil {
oldVal = int(oldVal.(float64))
}
if oldValMaterialized != nil {
oldValMaterialized = int(oldValMaterialized.(float64))
}
} else if meta.Type == "number" {
if vInt, ok := v.(int); ok {
v = float64(vInt)
}
} else if meta.Type == "array" && meta.Items.Type == "string" {
switch vArray := v.(type) {
case []interface{}:
// Normal case: user input is a yaml array
v = loadStringArray(vArray)
default:
// Pass, let the server attempt validation
}
if oldVal != nil {
oldVal = loadStringArray(oldVal.([]interface{}))
}
if oldValMaterialized != nil {
oldValMaterialized = loadStringArray(oldValMaterialized.([]interface{}))
}
}
// For types that aren't numeric or array, pass them through as-is
}
// We exclude cluster_id from upsert here and remove below to avoid any
// accidental duplication of the ID from one cluster to another
if k == "cluster_id" {
continue
}
addProperty := func(old interface{}) {
upsert[k] = v
// If the value is not [secret], the user changed the redacted sentinel
// value and is changing the secret. We redact the value that we store
// to our to-be-printed propertyDeltas.
if meta, ok := schema[k]; ok {
if v != nil && meta.IsSecret && fmt.Sprintf("%v", v) != "[secret]" {
v = "[redacted]"
}
}
propertyDeltas = append(propertyDeltas, propertyDelta{k, fmt.Sprintf("%v", old), fmt.Sprintf("%v", v)})
}
if haveOldVal {
// Since the admin endpoint will redact secret fields, ignore any
// such sentinel strings we've been given, to avoid accidentally
// setting configs to this value.
if fmt.Sprintf("%v", oldVal) == "[secret]" && fmt.Sprintf("%v", v) == "[secret]" {
continue
}
// If value changed, add it to list of updates.
// DeepEqual because values can be slices.
if !reflect.DeepEqual(oldVal, v) {
addProperty(oldVal)
}
} else {
// Present in input but not original config, insert if it differs
// from the materialized current value (which may be a default)
if !haveOldValMaterialized || !reflect.DeepEqual(oldValMaterialized, v) {
addProperty(oldValMaterialized)
}
}
}
for k := range oldConfigFull {
if _, found := readbackConfig[k]; !found {
if k == "cluster_id" {
// see above
continue
}
meta, inSchema := schema[k]
if !inSchema {
continue
}
if !all && meta.Visibility == "tunable" {
continue
}
oldValue, found := oldConfig[k]
if found {
propertyDeltas = append(propertyDeltas, propertyDelta{k, fmt.Sprintf("%v", oldValue), ""})
remove = append(remove, k)
}
}
}
if len(upsert) == 0 && len(remove) == 0 {
fmt.Println("No changes were made.")
return nil
}
tw := out.NewTable("PROPERTY", "PRIOR", "NEW")
for _, pd := range propertyDeltas {
tw.PrintStructFields(pd)
}
tw.Flush()
// Newline between table and result of write
fmt.Printf("\n")
// PUT to admin API
result, err := client.PatchClusterConfig(ctx, upsert, remove)
if he := (*admin.HTTPResponseError)(nil); errors.As(err, &he) {
// Special case 400 (validation) errors with friendly output
// about which configuration properties were invalid.
if he.Response.StatusCode == 400 {
ve, err := formatValidationError(err, he)
if err != nil {
return fmt.Errorf("error setting config: %v", err)
}
return &formattedError{ve}
}
}
// If we didn't handle a structured 400 error, check for other errors.
if err != nil {
return fmt.Errorf("error setting config: %v", err)
}
fmt.Printf("Successfully updated configuration. New configuration version is %d.\n", result.ConfigVersion)
status, err := client.ClusterConfigStatus(ctx, true)
out.MaybeDie(err, "unable to check if the cluster needs to be restarted: %v\nCheck the status with 'rpk cluster config status'.", err)
for _, value := range status {
if value.Restart {
fmt.Print("\nCluster needs to be restarted. See more details with 'rpk cluster config status'.\n")
break
}
}
return nil
}
func formatValidationError(
err error, httpErr *admin.HTTPResponseError,
) (string, error) {
// Output structured validation errors from server
var validationErrs map[string]string
bodyErr := json.Unmarshal(httpErr.Body, &validationErrs)
// If no proper JSON body, fall back to generic HTTP error report
if bodyErr != nil {
return "", err
}
type kv struct{ k, v string }
var sortedErrs []kv
for k, v := range validationErrs {
sortedErrs = append(sortedErrs, kv{k, v})
}
sort.Slice(sortedErrs, func(i, j int) bool { return sortedErrs[i].k < sortedErrs[j].k })
var buf strings.Builder
fmt.Fprintf(&buf, "Validation errors:\n")
for _, kv := range sortedErrs {
fmt.Fprintf(&buf, " * %s: %s\n", kv.k, kv.v)
}
fmt.Fprintf(&buf, "\n")
return buf.String(), nil
}
func newImportCommand(fs afero.Fs, all *bool) *cobra.Command {
var filename string
cmd := &cobra.Command{
Use: "import",
Short: "Import cluster configuration from a file",
Long: `Import cluster configuration from a file.
Import configuration from a YAML file, usually generated with
corresponding 'export' command. This downloads the current cluster
configuration, calculates the difference with the YAML file, and
updates any properties that were changed. If a property is removed
from the YAML file, it is reset to its default value. `,
Run: func(cmd *cobra.Command, args []string) {
p := config.ParamsFromCommand(cmd)
cfg, err := p.Load(fs)
out.MaybeDie(err, "unable to load config: %v", err)
client, err := admin.NewClient(fs, cfg)
out.MaybeDie(err, "unable to initialize admin client: %v", err)
// GET the schema
schema, err := client.ClusterConfigSchema(cmd.Context())
out.MaybeDie(err, "unable to query config schema: %v", err)
// GET current config
currentConfig, err := client.Config(cmd.Context(), false)
out.MaybeDie(err, "unable to query config values: %v", err)
currentFullConfig, err := client.Config(cmd.Context(), true)
out.MaybeDie(err, "unable to query config values: %v", err)
// Read back template & parse
err = importConfig(cmd.Context(), client, filename, currentConfig, currentFullConfig, schema, *all)
if fe := (*formattedError)(nil); errors.As(err, &fe) {
fmt.Fprint(os.Stderr, err)
out.Die("No changes were made")
}
out.MaybeDie(err, "error updating config: %v", err)
},
}
cmd.Flags().StringVarP(
&filename,
"filename",
"f",
"",
"full path to file to import, e.g. '/tmp/config.yml'",
)
return cmd
}
func loadStringArray(input []interface{}) []string {
result := make([]string, len(input))
for i, v := range input {
result[i] = fmt.Sprintf("%v", v)
}
return result
}