diff --git a/cmd/influxd/inspect/inspect.go b/cmd/influxd/inspect/inspect.go index 0ad6c3c1c55..fb2f9ee8c9c 100644 --- a/cmd/influxd/inspect/inspect.go +++ b/cmd/influxd/inspect/inspect.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_lp" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_tsi" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_tsm" + typecheck "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/type_conflicts" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_seriesfile" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_tombstone" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_tsm" @@ -33,6 +34,17 @@ func NewCommand(v *viper.Viper) (*cobra.Command, error) { if err != nil { return nil, err } + + checkSchema, err := typecheck.NewCheckSchemaCommand(v) + if err != nil { + return nil, err + } + + mergeSchema, err := typecheck.NewMergeSchemaCommand(v) + if err != nil { + return nil, err + } + base.AddCommand(exportLp) base.AddCommand(report_tsi.NewReportTSICommand()) base.AddCommand(export_index.NewExportIndexCommand()) @@ -46,6 +58,8 @@ func NewCommand(v *viper.Viper) (*cobra.Command, error) { base.AddCommand(verify_wal.NewVerifyWALCommand()) base.AddCommand(report_tsm.NewReportTSMCommand()) base.AddCommand(build_tsi.NewBuildTSICommand()) + base.AddCommand(checkSchema) + base.AddCommand(mergeSchema) return base, nil } diff --git a/cmd/influxd/inspect/type_conflicts/check_schema.go b/cmd/influxd/inspect/type_conflicts/check_schema.go new file mode 100644 index 00000000000..e91ee9569c8 --- /dev/null +++ b/cmd/influxd/inspect/type_conflicts/check_schema.go @@ -0,0 +1,155 @@ +package typecheck + +import ( + "errors" + "fmt" + "io" + "io/fs" + "os" + "path" + "path/filepath" + "strings" + + "github.com/influxdata/influxdb/v2/kit/cli" + "github.com/influxdata/influxdb/v2/tsdb" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type TypeConflictChecker struct { + Path string + SchemaFile string + ConflictsFile string + Logger *zap.Logger + + logLevel zapcore.Level +} + +func NewCheckSchemaCommand(v *viper.Viper) (*cobra.Command, error) { + flags := TypeConflictChecker{} + + cmd := &cobra.Command{ + Use: "check-schema", + Short: "Check for conflicts in the types between shards", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, _ []string) error { + return checkSchemaRunE(cmd, flags) + }, + } + opts := []cli.Opt{ + { + DestP: &flags.Path, + Flag: "path", + Desc: "Path under which fields.idx files are located", + Default: ".", + }, + { + DestP: &flags.SchemaFile, + Flag: "schema-file", + Desc: "Filename schema data should be written to", + Default: "schema.json", + }, + { + DestP: &flags.ConflictsFile, + Flag: "conflicts-file", + Desc: "Filename conflicts data should be written to", + Default: "conflicts.json", + }, + { + DestP: &flags.logLevel, + Flag: "log-level", + Desc: "The level of logging used througout the command", + Default: zap.InfoLevel, + }, + } + + if err := cli.BindOptions(v, cmd, opts); err != nil { + return nil, err + } + return cmd, nil +} + +func checkSchemaRunE(_ *cobra.Command, tc TypeConflictChecker) error { + logconf := zap.NewProductionConfig() + logconf.Level = zap.NewAtomicLevelAt(tc.logLevel) + logger, err := logconf.Build() + if err != nil { + return err + } + tc.Logger = logger + + // Get a set of every measurement/field/type tuple present. + var schema Schema + schema, err = tc.readFields() + if err != nil { + return err + } + + if err := schema.WriteSchemaFile(tc.SchemaFile); err != nil { + return err + } + if err := schema.WriteConflictsFile(tc.ConflictsFile); err != nil { + return err + } + + return nil +} + +func (tc *TypeConflictChecker) readFields() (Schema, error) { + schema := NewSchema() + var root string + fi, err := os.Stat(tc.Path) + if err != nil { + return nil, err + } + if fi.IsDir() { + root = tc.Path + } else { + root = path.Dir(tc.Path) + } + fileSystem := os.DirFS(".") + err = fs.WalkDir(fileSystem, root, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return fmt.Errorf("error walking file: %w", err) + } + + if filepath.Base(path) == tsdb.FieldsChangeFile { + fmt.Printf("WARN: A %s file was encountered at %s. The database was not shutdown properly, results of this command may be incomplete\n", + tsdb.FieldsChangeFile, + path, + ) + return nil + } + + if filepath.Base(path) != "fields.idx" { + return nil + } + + dirs := strings.Split(path, string(os.PathSeparator)) + bucket := dirs[len(dirs)-4] + rp := dirs[len(dirs)-3] + fmt.Printf("Processing %s\n", path) + + mfs, err := tsdb.NewMeasurementFieldSet(path, tc.Logger) + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return fmt.Errorf("unable to open file %q: %w", path, err) + } + defer mfs.Close() + + measurements := mfs.MeasurementNames() + for _, m := range measurements { + for f, typ := range mfs.FieldsByString(m).FieldSet() { + schema.AddField(bucket, rp, m, f, typ.String()) + } + } + + return nil + }) + + return schema, err +} diff --git a/cmd/influxd/inspect/type_conflicts/merge_schema.go b/cmd/influxd/inspect/type_conflicts/merge_schema.go new file mode 100644 index 00000000000..d6d982d21e2 --- /dev/null +++ b/cmd/influxd/inspect/type_conflicts/merge_schema.go @@ -0,0 +1,76 @@ +package typecheck + +import ( + "errors" + + "github.com/influxdata/influxdb/v2/kit/cli" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +type MergeFilesCommand struct { + OutputFile string + ConflictsFile string +} + +func NewMergeSchemaCommand(v *viper.Viper) (*cobra.Command, error) { + flags := MergeFilesCommand{} + + cmd := &cobra.Command{ + Use: "merge-schema", + Short: "Merge a set of schema files from the check-schema command", + Args: cobra.MinimumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return mergeSchemaRunE(cmd, args, flags) + }, + } + + opts := []cli.Opt{ + { + DestP: &flags.OutputFile, + Flag: "schema-file", + Desc: "Filename for the output file", + Default: "schema.json", + }, + { + DestP: &flags.ConflictsFile, + Flag: "conflicts-file", + Desc: "Filename conflicts data should be written to", + Default: "conflicts.json", + }, + } + + if err := cli.BindOptions(v, cmd, opts); err != nil { + return nil, err + } + return cmd, nil +} + +func mergeSchemaRunE(_ *cobra.Command, args []string, mf MergeFilesCommand) error { + return mf.mergeFiles(args) +} + +func (rc *MergeFilesCommand) mergeFiles(filenames []string) error { + if len(filenames) < 1 { + return errors.New("at least 1 file must be specified") + } + + schema, err := SchemaFromFile(filenames[0]) + if err != nil { + return err + } + + for _, filename := range filenames[1:] { + other, err := SchemaFromFile(filename) + if err != nil { + return err + } + schema.Merge(other) + } + + if err := schema.WriteConflictsFile(rc.ConflictsFile); err != nil { + return err + } + + return schema.WriteSchemaFile(rc.OutputFile) +} diff --git a/cmd/influxd/inspect/type_conflicts/schema.go b/cmd/influxd/inspect/type_conflicts/schema.go new file mode 100644 index 00000000000..7e6a33d137a --- /dev/null +++ b/cmd/influxd/inspect/type_conflicts/schema.go @@ -0,0 +1,149 @@ +package typecheck + +import ( + "encoding/json" + "fmt" + "io" + "os" + "strings" + + errors2 "github.com/influxdata/influxdb/v2/pkg/errors" +) + +type UniqueField struct { + Database string `json:"database"` + Retention string `json:"retention"` + Measurement string `json:"measurement"` + Field string `json:"field"` +} + +type FieldTypes map[string]struct{} +type Schema map[string]FieldTypes + +func (ft FieldTypes) MarshalText() (text []byte, err error) { + s := make([]string, 0, len(ft)) + for f := range ft { + s = append(s, f) + } + return []byte(strings.Join(s, ",")), nil +} + +func (ft *FieldTypes) UnmarshalText(text []byte) error { + if *ft == nil { + *ft = make(FieldTypes) + } + for _, ty := range strings.Split(string(text), ",") { + (*ft)[ty] = struct{}{} + } + return nil +} + +func NewSchema() Schema { + return make(Schema) +} + +func SchemaFromFile(filename string) (Schema, error) { + f, err := os.Open(filename) + if err != nil { + return nil, fmt.Errorf("unable to open schema file %q: %w", filename, err) + } + + s := NewSchema() + if err := s.Decode(f); err != nil { + return nil, fmt.Errorf("unable to decode schema file %q: %w", filename, err) + } + return s, nil +} + +func (uf *UniqueField) String() string { + return fmt.Sprintf("%q.%q.%q.%q", uf.Database, uf.Retention, uf.Measurement, uf.Field) +} + +func (s Schema) AddField(database, retention, measurement, field, dataType string) { + uf := UniqueField{ + Database: database, + Retention: retention, + Measurement: measurement, + Field: field, + } + s.AddFormattedField(uf.String(), dataType) +} + +func (s Schema) AddFormattedField(field string, dataType string) { + if _, ok := s[field]; !ok { + s[field] = make(map[string]struct{}) + } + s[field][dataType] = struct{}{} +} + +func (s Schema) Merge(schema Schema) { + for field, types := range schema { + for t := range types { + s.AddFormattedField(field, t) + } + } +} + +func (s Schema) Conflicts() Schema { + cs := NewSchema() + for field, t := range s { + if len(t) > 1 { + for ty := range t { + cs.AddFormattedField(field, ty) + } + } + } + return cs +} + +func (s Schema) WriteSchemaFile(filename string) error { + if len(s) == 0 { + fmt.Println("No schema file generated: no valid measurements/fields found") + return nil + } + + if err := s.encodeSchema(filename); err != nil { + return fmt.Errorf("unable to write schema file to %q: %w", filename, err) + } + fmt.Printf("Schema file written successfully to: %q\n", filename) + return nil +} + +func (s Schema) WriteConflictsFile(filename string) error { + conflicts := s.Conflicts() + if len(conflicts) == 0 { + fmt.Println("No conflicts file generated: no conflicts found") + return nil + } + + if err := conflicts.encodeSchema(filename); err != nil { + return fmt.Errorf("unable to write conflicts file to %q: %w", filename, err) + } + fmt.Printf("Conflicts file written successfully to: %q\n", filename) + return nil +} + +func (s Schema) encodeSchema(filename string) (rErr error) { + schemaFile, err := os.Create(filename) + defer errors2.Capture(&rErr, schemaFile.Close) + if err != nil { + return fmt.Errorf("unable to create schema file: %w", err) + } + return s.Encode(schemaFile) +} + +func (s Schema) Encode(w io.Writer) error { + enc := json.NewEncoder(w) + enc.SetIndent("", " ") + if err := enc.Encode(s); err != nil { + return fmt.Errorf("unable to encode schema: %w", err) + } + return nil +} + +func (s Schema) Decode(r io.Reader) error { + if err := json.NewDecoder(r).Decode(&s); err != nil { + return fmt.Errorf("unable to decode schema: %w", err) + } + return nil +} diff --git a/cmd/influxd/inspect/type_conflicts/schema_test.go b/cmd/influxd/inspect/type_conflicts/schema_test.go new file mode 100644 index 00000000000..b4b34764c56 --- /dev/null +++ b/cmd/influxd/inspect/type_conflicts/schema_test.go @@ -0,0 +1,78 @@ +package typecheck_test + +import ( + "bytes" + "testing" + + typecheck "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/type_conflicts" + "github.com/stretchr/testify/assert" +) + +func TestSchema_Encoding(t *testing.T) { + s := typecheck.NewSchema() + + b := bytes.Buffer{} + + s.AddField("db1", "rp1", "foo", "v2", "float") + s.AddField("db1", "rp1", "foo", "v2", "bool") + s.AddField("db1", "rp1", "bZ", "v1", "int") + + err := s.Encode(&b) + assert.NoError(t, err, "encode failed unexpectedly") + s2 := typecheck.NewSchema() + err = s2.Decode(&b) + assert.NoError(t, err, "decode failed unexpectedly") + assert.Len(t, s2, 2, "wrong number of fields - expected %d, got %d", 2, len(s)) + for f1, fields1 := range s { + assert.Len(t, + s2[f1], + len(fields1), + "differing number of types for a conflicted field %s: expected %d, got %d", + f1, + len(fields1), + len(s2[f1])) + } +} + +type filler struct { + typecheck.UniqueField + typ string +} + +func TestSchema_Merge(t *testing.T) { + const expectedConflicts = 2 + s1Fill := []filler{ + {typecheck.UniqueField{"db1", "rp1", "m1", "f1"}, "integer"}, + {typecheck.UniqueField{"db2", "rp1", "m1", "f1"}, "float"}, + {typecheck.UniqueField{"db1", "rp2", "m1", "f1"}, "string"}, + {typecheck.UniqueField{"db1", "rp1", "m2", "f1"}, "string"}, + {typecheck.UniqueField{"db1", "rp1", "m1", "f2"}, "float"}, + {typecheck.UniqueField{"db2", "rp2", "m2", "f2"}, "integer"}, + } + + s2Fill := []filler{ + {typecheck.UniqueField{"db1", "rp1", "m1", "f1"}, "integer"}, + {typecheck.UniqueField{"db2", "rp1", "m1", "f1"}, "string"}, + {typecheck.UniqueField{"db2", "rp2", "m2", "f2"}, "float"}, + {typecheck.UniqueField{"db1", "rp2", "m1", "f1"}, "string"}, + {typecheck.UniqueField{"db1", "rp1", "m2", "f1"}, "string"}, + {typecheck.UniqueField{"db1", "rp1", "m1", "f2"}, "float"}, + {typecheck.UniqueField{"db2", "rp2", "m2", "f2"}, "integer"}, + } + + s1 := typecheck.NewSchema() + s2 := typecheck.NewSchema() + fillSchema(s1, s1Fill) + fillSchema(s2, s2Fill) + + s1.Merge(s2) + conflicts := s1.Conflicts() + + assert.Len(t, conflicts, expectedConflicts, "wrong number of type conflicts detected: expected %d, got %d", expectedConflicts, len(conflicts)) +} + +func fillSchema(s typecheck.Schema, fill []filler) { + for _, f := range fill { + s.AddFormattedField(f.String(), f.typ) + } +}