Skip to content

Commit

Permalink
feat: port check-schema and merge-schema from 1.x (#23921)
Browse files Browse the repository at this point in the history
* feat: add check-schema and merge-schema commands to influx inspect

* chore: fix linting

* fix: add warning if fields.idxl is encountered
  • Loading branch information
jeffreyssmith2nd committed Nov 21, 2022
1 parent 9bf8840 commit 7708108
Show file tree
Hide file tree
Showing 5 changed files with 472 additions and 0 deletions.
14 changes: 14 additions & 0 deletions cmd/influxd/inspect/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_lp"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_tsi"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_tsm"
typecheck "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/type_conflicts"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_seriesfile"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_tombstone"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_tsm"
Expand All @@ -33,6 +34,17 @@ func NewCommand(v *viper.Viper) (*cobra.Command, error) {
if err != nil {
return nil, err
}

checkSchema, err := typecheck.NewCheckSchemaCommand(v)
if err != nil {
return nil, err
}

mergeSchema, err := typecheck.NewMergeSchemaCommand(v)
if err != nil {
return nil, err
}

base.AddCommand(exportLp)
base.AddCommand(report_tsi.NewReportTSICommand())
base.AddCommand(export_index.NewExportIndexCommand())
Expand All @@ -46,6 +58,8 @@ func NewCommand(v *viper.Viper) (*cobra.Command, error) {
base.AddCommand(verify_wal.NewVerifyWALCommand())
base.AddCommand(report_tsm.NewReportTSMCommand())
base.AddCommand(build_tsi.NewBuildTSICommand())
base.AddCommand(checkSchema)
base.AddCommand(mergeSchema)

return base, nil
}
155 changes: 155 additions & 0 deletions cmd/influxd/inspect/type_conflicts/check_schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package typecheck

import (
"errors"
"fmt"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"strings"

"github.com/influxdata/influxdb/v2/kit/cli"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type TypeConflictChecker struct {
Path string
SchemaFile string
ConflictsFile string
Logger *zap.Logger

logLevel zapcore.Level
}

func NewCheckSchemaCommand(v *viper.Viper) (*cobra.Command, error) {
flags := TypeConflictChecker{}

cmd := &cobra.Command{
Use: "check-schema",
Short: "Check for conflicts in the types between shards",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return checkSchemaRunE(cmd, flags)
},
}
opts := []cli.Opt{
{
DestP: &flags.Path,
Flag: "path",
Desc: "Path under which fields.idx files are located",
Default: ".",
},
{
DestP: &flags.SchemaFile,
Flag: "schema-file",
Desc: "Filename schema data should be written to",
Default: "schema.json",
},
{
DestP: &flags.ConflictsFile,
Flag: "conflicts-file",
Desc: "Filename conflicts data should be written to",
Default: "conflicts.json",
},
{
DestP: &flags.logLevel,
Flag: "log-level",
Desc: "The level of logging used througout the command",
Default: zap.InfoLevel,
},
}

if err := cli.BindOptions(v, cmd, opts); err != nil {
return nil, err
}
return cmd, nil
}

func checkSchemaRunE(_ *cobra.Command, tc TypeConflictChecker) error {
logconf := zap.NewProductionConfig()
logconf.Level = zap.NewAtomicLevelAt(tc.logLevel)
logger, err := logconf.Build()
if err != nil {
return err
}
tc.Logger = logger

// Get a set of every measurement/field/type tuple present.
var schema Schema
schema, err = tc.readFields()
if err != nil {
return err
}

if err := schema.WriteSchemaFile(tc.SchemaFile); err != nil {
return err
}
if err := schema.WriteConflictsFile(tc.ConflictsFile); err != nil {
return err
}

return nil
}

func (tc *TypeConflictChecker) readFields() (Schema, error) {
schema := NewSchema()
var root string
fi, err := os.Stat(tc.Path)
if err != nil {
return nil, err
}
if fi.IsDir() {
root = tc.Path
} else {
root = path.Dir(tc.Path)
}
fileSystem := os.DirFS(".")
err = fs.WalkDir(fileSystem, root, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return fmt.Errorf("error walking file: %w", err)
}

if filepath.Base(path) == tsdb.FieldsChangeFile {
fmt.Printf("WARN: A %s file was encountered at %s. The database was not shutdown properly, results of this command may be incomplete\n",
tsdb.FieldsChangeFile,
path,
)
return nil
}

if filepath.Base(path) != "fields.idx" {
return nil
}

dirs := strings.Split(path, string(os.PathSeparator))
bucket := dirs[len(dirs)-4]
rp := dirs[len(dirs)-3]
fmt.Printf("Processing %s\n", path)

mfs, err := tsdb.NewMeasurementFieldSet(path, tc.Logger)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return fmt.Errorf("unable to open file %q: %w", path, err)
}
defer mfs.Close()

measurements := mfs.MeasurementNames()
for _, m := range measurements {
for f, typ := range mfs.FieldsByString(m).FieldSet() {
schema.AddField(bucket, rp, m, f, typ.String())
}
}

return nil
})

return schema, err
}
76 changes: 76 additions & 0 deletions cmd/influxd/inspect/type_conflicts/merge_schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package typecheck

import (
"errors"

"github.com/influxdata/influxdb/v2/kit/cli"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

type MergeFilesCommand struct {
OutputFile string
ConflictsFile string
}

func NewMergeSchemaCommand(v *viper.Viper) (*cobra.Command, error) {
flags := MergeFilesCommand{}

cmd := &cobra.Command{
Use: "merge-schema",
Short: "Merge a set of schema files from the check-schema command",
Args: cobra.MinimumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
return mergeSchemaRunE(cmd, args, flags)
},
}

opts := []cli.Opt{
{
DestP: &flags.OutputFile,
Flag: "schema-file",
Desc: "Filename for the output file",
Default: "schema.json",
},
{
DestP: &flags.ConflictsFile,
Flag: "conflicts-file",
Desc: "Filename conflicts data should be written to",
Default: "conflicts.json",
},
}

if err := cli.BindOptions(v, cmd, opts); err != nil {
return nil, err
}
return cmd, nil
}

func mergeSchemaRunE(_ *cobra.Command, args []string, mf MergeFilesCommand) error {
return mf.mergeFiles(args)
}

func (rc *MergeFilesCommand) mergeFiles(filenames []string) error {
if len(filenames) < 1 {
return errors.New("at least 1 file must be specified")
}

schema, err := SchemaFromFile(filenames[0])
if err != nil {
return err
}

for _, filename := range filenames[1:] {
other, err := SchemaFromFile(filename)
if err != nil {
return err
}
schema.Merge(other)
}

if err := schema.WriteConflictsFile(rc.ConflictsFile); err != nil {
return err
}

return schema.WriteSchemaFile(rc.OutputFile)
}
Loading

0 comments on commit 7708108

Please sign in to comment.