diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d315cb9ac3..48a78f65776 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - [#8525](https://github.com/influxdata/influxdb/issues/8525): Support http pipelining for /query endpoint. - [#8652](https://github.com/influxdata/influxdb/pull/8652): Reduce allocations when reading data - [#8592](https://github.com/influxdata/influxdb/pull/8592): Mutex profiles are now available. +- [#8669](https://github.com/influxdata/influxdb/pull/8669): TSI Index Migration Tool ### Bugfixes diff --git a/cmd/influx_inspect/help/help.go b/cmd/influx_inspect/help/help.go index 3610fe43d49..3b66ad25b39 100644 --- a/cmd/influx_inspect/help/help.go +++ b/cmd/influx_inspect/help/help.go @@ -34,6 +34,7 @@ The commands are: dumptsi dumps low-level details about tsi1 files. dumptsm dumps low-level details about tsm1 files. export exports raw data from a shard to line protocol + inmem2tsi generates a tsi1 index from an in-memory index shard help display this help message report displays a shard level report verify verifies integrity of TSM files diff --git a/cmd/influx_inspect/inmem2tsi/inmem2tsi.go b/cmd/influx_inspect/inmem2tsi/inmem2tsi.go new file mode 100644 index 00000000000..70c742fe52d --- /dev/null +++ b/cmd/influx_inspect/inmem2tsi/inmem2tsi.go @@ -0,0 +1,164 @@ +// Package inmem2tsi reads an in-memory index and exports it as a TSI index. +package inmem2tsi + +import ( + "errors" + "flag" + "fmt" + "io" + "os" + "path/filepath" + + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/tsdb" + "github.com/influxdata/influxdb/tsdb/index/inmem" + "github.com/influxdata/influxdb/tsdb/index/tsi1" + "github.com/uber-go/zap" +) + +// Command represents the program execution for "influx_inspect inmem2tsi". +type Command struct { + Stderr io.Writer + Stdout io.Writer + Logger zap.Logger +} + +// NewCommand returns a new instance of Command. +func NewCommand() *Command { + return &Command{ + Stderr: os.Stderr, + Stdout: os.Stdout, + Logger: zap.New(zap.NullEncoder()), + } +} + +// Run executes the command. +func (cmd *Command) Run(args ...string) error { + fs := flag.NewFlagSet("inmem2tsi", flag.ExitOnError) + path := fs.String("path", "", "data path") + walPath := fs.String("wal-path", "", "WAL path") + verbose := fs.Bool("v", false, "verbose") + fs.SetOutput(cmd.Stdout) + fs.Usage = cmd.printUsage + if err := fs.Parse(args); err != nil { + return err + } else if fs.NArg() > 0 || *path == "" || *walPath == "" { + cmd.printUsage() + return flag.ErrHelp + } + + if *verbose { + cmd.Logger = zap.New( + zap.NewTextEncoder(), + zap.Output(os.Stderr), + ) + } + + return cmd.run(*path, *walPath, *verbose) +} + +func (cmd *Command) run(path, walPath string, verbose bool) error { + // Check if shard already has a TSI index. + indexPath := filepath.Join(path, "index") + cmd.Logger.Info("checking index path", zap.String("path", indexPath)) + if _, err := os.Stat(indexPath); !os.IsNotExist(err) { + return errors.New("tsi1 index already exists") + } + + cmd.Logger.Info("opening shard", zap.String("path", path), zap.String("wal-path", walPath)) + + // Open shard at path. + sh := tsdb.NewShard(0, path, walPath, tsdb.EngineOptions{ + EngineVersion: tsdb.DefaultEngine, + IndexVersion: inmem.IndexName, + InmemIndex: inmem.NewIndex(""), + }) + if err := sh.Open(); err != nil { + return err + } + defer sh.CloseFast() + + cmd.Logger.Info("reading in-memory index") + + // Retrieve in-memory index reference. + inmemIndex, ok := sh.Index().(*inmem.ShardIndex) + if !ok { + return fmt.Errorf("invalid source index type: %T", sh.Index()) + } + + // Remove temporary index files if this is being re-run. + tmpPath := filepath.Join(path, ".index") + cmd.Logger.Info("cleaning up partial index from previous run, if any") + if err := os.RemoveAll(tmpPath); err != nil { + return err + } + + // Open TSI index in temporary path. + tsiIndex := tsi1.NewIndex() + tsiIndex.Path = tmpPath + tsiIndex.WithLogger(cmd.Logger) + cmd.Logger.Info("opening tsi index in temporary location", zap.String("path", tmpPath)) + if err := tsiIndex.Open(); err != nil { + return err + } + defer tsiIndex.Close() + + cmd.Logger.Info("iterating over measurements") + + // Iterate over each series & insert into new index. + if err := inmemIndex.ForEachMeasurementName(func(name []byte) error { + cmd.Logger.Info("processing measurement", zap.String("name", string(name))) + + mm, err := inmemIndex.Measurement(name) + if err != nil { + return err + } else if mm == nil { + return nil + } + + if err := mm.ForEachSeriesByExpr(nil, func(tags models.Tags) error { + cmd.Logger.Info("series", zap.String("name", string(name)), zap.String("tags", tags.String())) + if err := tsiIndex.CreateSeriesIfNotExists(nil, name, tags); err != nil { + return fmt.Errorf("cannot create series: %s %s (%s)", name, tags.String(), err) + } + return nil + }); err != nil { + return err + } + return nil + + }); err != nil { + return err + } + + cmd.Logger.Info("compacting index") + + // Attempt to compact the index & wait for all compactions to complete. + tsiIndex.Compact() + tsiIndex.Wait() + + cmd.Logger.Info("closing tsi index") + + // Close TSI index. + if err := tsiIndex.Close(); err != nil { + return err + } + + cmd.Logger.Info("moving tsi to permanent location") + + // Rename TSI to standard path. + if err := os.Rename(tmpPath, indexPath); err != nil { + return err + } + + return nil +} + +func (cmd *Command) printUsage() { + usage := `Converts a shard from an in-memory index to a TSI index. + +Usage: influx_inspect inmem2tsi -path DATA_PATH -wal-path WAL_PATH +` + + fmt.Fprintf(cmd.Stdout, usage) +} diff --git a/cmd/influx_inspect/main.go b/cmd/influx_inspect/main.go index 8f2434ed595..82b82a4118f 100644 --- a/cmd/influx_inspect/main.go +++ b/cmd/influx_inspect/main.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/influxdb/cmd/influx_inspect/dumptsm" "github.com/influxdata/influxdb/cmd/influx_inspect/export" "github.com/influxdata/influxdb/cmd/influx_inspect/help" + "github.com/influxdata/influxdb/cmd/influx_inspect/inmem2tsi" "github.com/influxdata/influxdb/cmd/influx_inspect/report" "github.com/influxdata/influxdb/cmd/influx_inspect/verify" _ "github.com/influxdata/influxdb/tsdb/engine" @@ -72,6 +73,11 @@ func (m *Main) Run(args ...string) error { if err := name.Run(args...); err != nil { return fmt.Errorf("export: %s", err) } + case "inmem2tsi": + name := inmem2tsi.NewCommand() + if err := name.Run(args...); err != nil { + return fmt.Errorf("inmem2tsi: %s", err) + } case "report": name := report.NewCommand() if err := name.Run(args...); err != nil { diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 0b32008c8bf..090b753888d 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -242,6 +242,11 @@ func (i *Index) deleteNonManifestFiles(m *Manifest) error { return nil } +// Wait returns once outstanding compactions have finished. +func (i *Index) Wait() { + i.wg.Wait() +} + // Close closes the index. func (i *Index) Close() error { // Wait for goroutines to finish. diff --git a/tsdb/shard.go b/tsdb/shard.go index c5a8b1c51a0..5c15512b7b6 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -401,6 +401,12 @@ func (s *Shard) UnloadIndex() { s.index.RemoveShard(s.id) } +// Index returns a reference to the underlying index. +// This should only be used by utilities and not directly accessed by the database. +func (s *Shard) Index() Index { + return s.index +} + // IsIdle return true if the shard is not receiving writes and is fully compacted. func (s *Shard) IsIdle() bool { if err := s.ready(); err != nil {