Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSI Index Migration Tool #8669

Merged
merged 1 commit into from
Aug 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- [#8525](https://github.com/influxdata/influxdb/issues/8525): Support http pipelining for /query endpoint.
- [#8652](https://github.com/influxdata/influxdb/pull/8652): Reduce allocations when reading data
- [#8592](https://github.com/influxdata/influxdb/pull/8592): Mutex profiles are now available.
- [#8669](https://github.com/influxdata/influxdb/pull/8669): TSI Index Migration Tool

### Bugfixes

Expand Down
1 change: 1 addition & 0 deletions cmd/influx_inspect/help/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The commands are:
dumptsi dumps low-level details about tsi1 files.
dumptsm dumps low-level details about tsm1 files.
export exports raw data from a shard to line protocol
inmem2tsi generates a tsi1 index from an in-memory index shard
help display this help message
report displays a shard level report
verify verifies integrity of TSM files
Expand Down
164 changes: 164 additions & 0 deletions cmd/influx_inspect/inmem2tsi/inmem2tsi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Package inmem2tsi reads an in-memory index and exports it as a TSI index.
package inmem2tsi

import (
"errors"
"flag"
"fmt"
"io"
"os"
"path/filepath"

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/inmem"
"github.com/influxdata/influxdb/tsdb/index/tsi1"
"github.com/uber-go/zap"
)

// Command represents the program execution for "influx_inspect inmem2tsi".
type Command struct {
Stderr io.Writer
Stdout io.Writer
Logger zap.Logger
}

// NewCommand returns a new instance of Command.
func NewCommand() *Command {
return &Command{
Stderr: os.Stderr,
Stdout: os.Stdout,
Logger: zap.New(zap.NullEncoder()),
}
}

// Run executes the command.
func (cmd *Command) Run(args ...string) error {
fs := flag.NewFlagSet("inmem2tsi", flag.ExitOnError)
path := fs.String("path", "", "data path")
walPath := fs.String("wal-path", "", "WAL path")
verbose := fs.Bool("v", false, "verbose")
fs.SetOutput(cmd.Stdout)
fs.Usage = cmd.printUsage
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() > 0 || *path == "" || *walPath == "" {
cmd.printUsage()
return flag.ErrHelp
}

if *verbose {
cmd.Logger = zap.New(
zap.NewTextEncoder(),
zap.Output(os.Stderr),
)
}

return cmd.run(*path, *walPath, *verbose)
}

func (cmd *Command) run(path, walPath string, verbose bool) error {
// Check if shard already has a TSI index.
indexPath := filepath.Join(path, "index")
cmd.Logger.Info("checking index path", zap.String("path", indexPath))
if _, err := os.Stat(indexPath); !os.IsNotExist(err) {
return errors.New("tsi1 index already exists")
}

cmd.Logger.Info("opening shard", zap.String("path", path), zap.String("wal-path", walPath))

// Open shard at path.
sh := tsdb.NewShard(0, path, walPath, tsdb.EngineOptions{
EngineVersion: tsdb.DefaultEngine,
IndexVersion: inmem.IndexName,
InmemIndex: inmem.NewIndex(""),
})
if err := sh.Open(); err != nil {
return err
}
defer sh.CloseFast()

cmd.Logger.Info("reading in-memory index")

// Retrieve in-memory index reference.
inmemIndex, ok := sh.Index().(*inmem.ShardIndex)
if !ok {
return fmt.Errorf("invalid source index type: %T", sh.Index())
}

// Remove temporary index files if this is being re-run.
tmpPath := filepath.Join(path, ".index")
cmd.Logger.Info("cleaning up partial index from previous run, if any")
if err := os.RemoveAll(tmpPath); err != nil {
return err
}

// Open TSI index in temporary path.
tsiIndex := tsi1.NewIndex()
tsiIndex.Path = tmpPath
tsiIndex.WithLogger(cmd.Logger)
cmd.Logger.Info("opening tsi index in temporary location", zap.String("path", tmpPath))
if err := tsiIndex.Open(); err != nil {
return err
}
defer tsiIndex.Close()

cmd.Logger.Info("iterating over measurements")

// Iterate over each series & insert into new index.
if err := inmemIndex.ForEachMeasurementName(func(name []byte) error {
cmd.Logger.Info("processing measurement", zap.String("name", string(name)))

mm, err := inmemIndex.Measurement(name)
if err != nil {
return err
} else if mm == nil {
return nil
}

if err := mm.ForEachSeriesByExpr(nil, func(tags models.Tags) error {
cmd.Logger.Info("series", zap.String("name", string(name)), zap.String("tags", tags.String()))
if err := tsiIndex.CreateSeriesIfNotExists(nil, name, tags); err != nil {
return fmt.Errorf("cannot create series: %s %s (%s)", name, tags.String(), err)
}
return nil
}); err != nil {
return err
}
return nil

}); err != nil {
return err
}

cmd.Logger.Info("compacting index")

// Attempt to compact the index & wait for all compactions to complete.
tsiIndex.Compact()
tsiIndex.Wait()

cmd.Logger.Info("closing tsi index")

// Close TSI index.
if err := tsiIndex.Close(); err != nil {
return err
}

cmd.Logger.Info("moving tsi to permanent location")

// Rename TSI to standard path.
if err := os.Rename(tmpPath, indexPath); err != nil {
return err
}

return nil
}

func (cmd *Command) printUsage() {
usage := `Converts a shard from an in-memory index to a TSI index.

Usage: influx_inspect inmem2tsi -path DATA_PATH -wal-path WAL_PATH
`

fmt.Fprintf(cmd.Stdout, usage)
}
6 changes: 6 additions & 0 deletions cmd/influx_inspect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/influxdata/influxdb/cmd/influx_inspect/dumptsm"
"github.com/influxdata/influxdb/cmd/influx_inspect/export"
"github.com/influxdata/influxdb/cmd/influx_inspect/help"
"github.com/influxdata/influxdb/cmd/influx_inspect/inmem2tsi"
"github.com/influxdata/influxdb/cmd/influx_inspect/report"
"github.com/influxdata/influxdb/cmd/influx_inspect/verify"
_ "github.com/influxdata/influxdb/tsdb/engine"
Expand Down Expand Up @@ -72,6 +73,11 @@ func (m *Main) Run(args ...string) error {
if err := name.Run(args...); err != nil {
return fmt.Errorf("export: %s", err)
}
case "inmem2tsi":
name := inmem2tsi.NewCommand()
if err := name.Run(args...); err != nil {
return fmt.Errorf("inmem2tsi: %s", err)
}
case "report":
name := report.NewCommand()
if err := name.Run(args...); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ func (i *Index) deleteNonManifestFiles(m *Manifest) error {
return nil
}

// Wait returns once outstanding compactions have finished.
func (i *Index) Wait() {
i.wg.Wait()
}

// Close closes the index.
func (i *Index) Close() error {
// Wait for goroutines to finish.
Expand Down
6 changes: 6 additions & 0 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,12 @@ func (s *Shard) UnloadIndex() {
s.index.RemoveShard(s.id)
}

// Index returns a reference to the underlying index.
// This should only be used by utilities and not directly accessed by the database.
func (s *Shard) Index() Index {
return s.index
}

// IsIdle return true if the shard is not receiving writes and is fully compacted.
func (s *Shard) IsIdle() bool {
if err := s.ready(); err != nil {
Expand Down