From 489c89bea4d3ece5c4b729934225ac6ba6bd9029 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 5 May 2017 15:06:07 -0600 Subject: [PATCH] Add tsi support tooling. --- CHANGELOG.md | 1 + cmd/influx_inspect/dumptsi/dumptsi.go | 469 ++++++++++++++++++++++++++ cmd/influx_inspect/main.go | 6 + tsdb/index/tsi1/file_set.go | 11 + tsdb/index/tsi1/index.go | 10 +- tsdb/index/tsi1/index_file.go | 9 + tsdb/index/tsi1/log_file.go | 29 ++ tsdb/index/tsi1/measurement_block.go | 9 + tsdb/index/tsi1/tag_block.go | 6 + 9 files changed, 548 insertions(+), 2 deletions(-) create mode 100644 cmd/influx_inspect/dumptsi/dumptsi.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 2daa00d4060..c12d9cd493a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio - [#8273](https://github.com/influxdata/influxdb/issues/8273): Remove the admin UI. - [#8327](https://github.com/influxdata/influxdb/pull/8327): Update to go1.8.1 - [#8348](https://github.com/influxdata/influxdb/pull/8348): Add max concurrent compaction limits +- [#8366](https://github.com/influxdata/influxdb/pull/8366): Add TSI support tooling. ### Bugfixes diff --git a/cmd/influx_inspect/dumptsi/dumptsi.go b/cmd/influx_inspect/dumptsi/dumptsi.go new file mode 100644 index 00000000000..a52813e918e --- /dev/null +++ b/cmd/influx_inspect/dumptsi/dumptsi.go @@ -0,0 +1,469 @@ +// Package dumptsi inspects low-level details about tsi1 files. +package dumptsi + +import ( + "flag" + "fmt" + "io" + "os" + "path/filepath" + "regexp" + "text/tabwriter" + + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/tsdb/index/tsi1" +) + +// Command represents the program execution for "influxd dumptsi". +type Command struct { + // Standard input/output, overridden for testing. + Stderr io.Writer + Stdout io.Writer + + paths []string + + showSeries bool + showMeasurements bool + showTagKeys bool + showTagValues bool + showTagValueSeries bool + + measurementFilter *regexp.Regexp + tagKeyFilter *regexp.Regexp + tagValueFilter *regexp.Regexp +} + +// NewCommand returns a new instance of Command. +func NewCommand() *Command { + return &Command{ + Stderr: os.Stderr, + Stdout: os.Stdout, + } +} + +// Run executes the command. +func (cmd *Command) Run(args ...string) error { + var measurementFilter, tagKeyFilter, tagValueFilter string + fs := flag.NewFlagSet("dumptsi", flag.ExitOnError) + fs.BoolVar(&cmd.showSeries, "series", false, "Show raw series data") + fs.BoolVar(&cmd.showMeasurements, "measurements", false, "Show raw measurement data") + fs.BoolVar(&cmd.showTagKeys, "tag-keys", false, "Show raw tag key data") + fs.BoolVar(&cmd.showTagValues, "tag-values", false, "Show raw tag value data") + fs.BoolVar(&cmd.showTagValueSeries, "tag-value-series", false, "Show raw series data for each value") + fs.StringVar(&measurementFilter, "measurement-filter", "", "Regex measurement filter") + fs.StringVar(&tagKeyFilter, "tag-key-filter", "", "Regex tag key filter") + fs.StringVar(&tagValueFilter, "tag-value-filter", "", "Regex tag value filter") + fs.SetOutput(cmd.Stdout) + fs.Usage = cmd.printUsage + if err := fs.Parse(args); err != nil { + return err + } + + // Parse filters. + if measurementFilter != "" { + re, err := regexp.Compile(measurementFilter) + if err != nil { + return err + } + cmd.measurementFilter = re + } + if tagKeyFilter != "" { + re, err := regexp.Compile(tagKeyFilter) + if err != nil { + return err + } + cmd.tagKeyFilter = re + } + if tagValueFilter != "" { + re, err := regexp.Compile(tagValueFilter) + if err != nil { + return err + } + cmd.tagValueFilter = re + } + + cmd.paths = fs.Args() + if len(cmd.paths) == 0 { + fmt.Printf("at least one path required\n\n") + fs.Usage() + return nil + } + + // Some flags imply other flags. + if cmd.showTagValueSeries { + cmd.showTagValues = true + } + if cmd.showTagValues { + cmd.showTagKeys = true + } + if cmd.showTagKeys { + cmd.showMeasurements = true + } + + return cmd.run() +} + +func (cmd *Command) run() error { + // Build a file set from the paths on the command line. + idx, fs, err := cmd.readFileSet() + if err != nil { + return err + } + + if idx != nil { + defer idx.Close() + } else { + defer fs.Close() + } + defer fs.Release() + + // Show either raw data or summary stats. + if cmd.showSeries || cmd.showMeasurements { + if err := cmd.printMerged(fs); err != nil { + return err + } + } else { + if err := cmd.printFileSummaries(fs); err != nil { + return err + } + } + + return nil +} + +func (cmd *Command) readFileSet() (*tsi1.Index, tsi1.FileSet, error) { + // If only one path exists and it's a directory then open as an index. + if len(cmd.paths) == 1 { + fi, err := os.Stat(cmd.paths[0]) + if err != nil { + return nil, nil, err + } else if fi.IsDir() { + idx := tsi1.NewIndex() + idx.Path = cmd.paths[0] + idx.CompactionEnabled = false + if err := idx.Open(); err != nil { + return nil, nil, err + } + return idx, idx.RetainFileSet(), nil + } + } + + // Open each file and group into a fileset. + var fs tsi1.FileSet + for _, path := range cmd.paths { + switch ext := filepath.Ext(path); ext { + case tsi1.LogFileExt: + f := tsi1.NewLogFile(path) + if err := f.Open(); err != nil { + return nil, nil, err + } + fs = append(fs, f) + + case tsi1.IndexFileExt: + f := tsi1.NewIndexFile() + f.SetPath(path) + if err := f.Open(); err != nil { + return nil, nil, err + } + fs = append(fs, f) + + default: + return nil, nil, fmt.Errorf("unexpected file extension: %s", ext) + } + } + + fs.Retain() + return nil, fs, nil +} + +func (cmd *Command) printMerged(fs tsi1.FileSet) error { + if err := cmd.printSeries(fs); err != nil { + return err + } else if err := cmd.printMeasurements(fs); err != nil { + return err + } + return nil +} + +func (cmd *Command) printSeries(fs tsi1.FileSet) error { + if !cmd.showSeries { + return nil + } + + // Print header. + tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0) + fmt.Fprintln(tw, "Series\t") + + // Iterate over each series. + itr := fs.SeriesIterator() + for e := itr.Next(); e != nil; e = itr.Next() { + name, tags := e.Name(), e.Tags() + + if !cmd.matchSeries(e.Name(), e.Tags()) { + continue + } + + fmt.Fprintf(tw, "%s%s\t%v\n", name, tags.HashKey(), deletedString(e.Deleted())) + } + + // Flush & write footer spacing. + if err := tw.Flush(); err != nil { + return err + } + fmt.Fprint(cmd.Stdout, "\n\n") + + return nil +} + +func (cmd *Command) printMeasurements(fs tsi1.FileSet) error { + if !cmd.showMeasurements { + return nil + } + + tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0) + fmt.Fprintln(tw, "Measurement\t") + + // Iterate over each series. + itr := fs.MeasurementIterator() + for e := itr.Next(); e != nil; e = itr.Next() { + if cmd.measurementFilter != nil && !cmd.measurementFilter.Match(e.Name()) { + continue + } + + fmt.Fprintf(tw, "%s\t%v\n", e.Name(), deletedString(e.Deleted())) + if err := tw.Flush(); err != nil { + return err + } + + if err := cmd.printTagKeys(fs, e.Name()); err != nil { + return err + } + } + + fmt.Fprint(cmd.Stdout, "\n\n") + + return nil +} + +func (cmd *Command) printTagKeys(fs tsi1.FileSet, name []byte) error { + if !cmd.showTagKeys { + return nil + } + + // Iterate over each key. + tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0) + itr := fs.TagKeyIterator(name) + for e := itr.Next(); e != nil; e = itr.Next() { + if cmd.tagKeyFilter != nil && !cmd.tagKeyFilter.Match(e.Key()) { + continue + } + + fmt.Fprintf(tw, " %s\t%v\n", e.Key(), deletedString(e.Deleted())) + if err := tw.Flush(); err != nil { + return err + } + + if err := cmd.printTagValues(fs, name, e.Key()); err != nil { + return err + } + } + fmt.Fprint(cmd.Stdout, "\n") + + return nil +} + +func (cmd *Command) printTagValues(fs tsi1.FileSet, name, key []byte) error { + if !cmd.showTagValues { + return nil + } + + // Iterate over each value. + tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0) + itr := fs.TagValueIterator(name, key) + for e := itr.Next(); e != nil; e = itr.Next() { + if cmd.tagValueFilter != nil && !cmd.tagValueFilter.Match(e.Value()) { + continue + } + + fmt.Fprintf(tw, " %s\t%v\n", e.Value(), deletedString(e.Deleted())) + if err := tw.Flush(); err != nil { + return err + } + + if err := cmd.printTagValueSeries(fs, name, key, e.Value()); err != nil { + return err + } + } + fmt.Fprint(cmd.Stdout, "\n") + + return nil +} + +func (cmd *Command) printTagValueSeries(fs tsi1.FileSet, name, key, value []byte) error { + if !cmd.showTagValueSeries { + return nil + } + + // Iterate over each series. + tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0) + itr := fs.TagValueSeriesIterator(name, key, value) + for e := itr.Next(); e != nil; e = itr.Next() { + if !cmd.matchSeries(e.Name(), e.Tags()) { + continue + } + + fmt.Fprintf(tw, " %s%s\n", e.Name(), e.Tags().HashKey()) + if err := tw.Flush(); err != nil { + return err + } + } + fmt.Fprint(cmd.Stdout, "\n") + + return nil +} + +func (cmd *Command) printFileSummaries(fs tsi1.FileSet) error { + for _, f := range fs { + switch f := f.(type) { + case *tsi1.LogFile: + if err := cmd.printLogFileSummary(f); err != nil { + return err + } + case *tsi1.IndexFile: + if err := cmd.printIndexFileSummary(f); err != nil { + return err + } + default: + panic("unreachable") + } + fmt.Fprintln(cmd.Stdout, "") + } + return nil +} + +func (cmd *Command) printLogFileSummary(f *tsi1.LogFile) error { + fmt.Fprintf(cmd.Stdout, "[LOG FILE] %s\n", filepath.Base(f.Path())) + tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0) + fmt.Fprintf(tw, "Series:\t%d\n", f.SeriesN()) + fmt.Fprintf(tw, "Measurements:\t%d\n", f.MeasurementN()) + fmt.Fprintf(tw, "Tag Keys:\t%d\n", f.TagKeyN()) + fmt.Fprintf(tw, "Tag Values:\t%d\n", f.TagValueN()) + return tw.Flush() +} + +func (cmd *Command) printIndexFileSummary(f *tsi1.IndexFile) error { + fmt.Fprintf(cmd.Stdout, "[INDEX FILE] %s\n", filepath.Base(f.Path())) + + // Calculate summary stats. + seriesN := f.SeriesN() + var measurementN, measurementSeriesN, measurementSeriesSize uint64 + var keyN uint64 + var valueN, valueSeriesN, valueSeriesSize uint64 + mitr := f.MeasurementIterator() + for me, _ := mitr.Next().(*tsi1.MeasurementBlockElem); me != nil; me, _ = mitr.Next().(*tsi1.MeasurementBlockElem) { + kitr := f.TagKeyIterator(me.Name()) + for ke, _ := kitr.Next().(*tsi1.TagBlockKeyElem); ke != nil; ke, _ = kitr.Next().(*tsi1.TagBlockKeyElem) { + vitr := f.TagValueIterator(me.Name(), ke.Key()) + for ve, _ := vitr.Next().(*tsi1.TagBlockValueElem); ve != nil; ve, _ = vitr.Next().(*tsi1.TagBlockValueElem) { + valueN++ + valueSeriesN += uint64(ve.SeriesN()) + valueSeriesSize += uint64(len(ve.SeriesData())) + } + keyN++ + } + measurementN++ + measurementSeriesN += uint64(me.SeriesN()) + measurementSeriesSize += uint64(len(me.SeriesData())) + } + + // Write stats. + tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0) + fmt.Fprintf(tw, "Series:\t%d\n", seriesN) + fmt.Fprintf(tw, "Measurements:\t%d\n", measurementN) + fmt.Fprintf(tw, " Series data size:\t%d (%s)\n", measurementSeriesSize, formatSize(measurementSeriesSize)) + fmt.Fprintf(tw, " Bytes per series:\t%.01fb\n", float64(measurementSeriesSize)/float64(measurementSeriesN)) + fmt.Fprintf(tw, "Tag Keys:\t%d\n", keyN) + fmt.Fprintf(tw, "Tag Values:\t%d\n", valueN) + fmt.Fprintf(tw, " Series:\t%d\n", valueSeriesN) + fmt.Fprintf(tw, " Series data size:\t%d (%s)\n", valueSeriesSize, formatSize(valueSeriesSize)) + fmt.Fprintf(tw, " Bytes per series:\t%.01fb\n", float64(valueSeriesSize)/float64(valueSeriesN)) + fmt.Fprintf(tw, "Avg tags per series:\t%.01f\n", float64(valueSeriesN)/float64(seriesN)) + if err := tw.Flush(); err != nil { + return err + } + + return nil +} + +// matchSeries returns true if the command filters matches the series. +func (cmd *Command) matchSeries(name []byte, tags models.Tags) bool { + // Filter by measurement. + if cmd.measurementFilter != nil && !cmd.measurementFilter.Match(name) { + return false + } + + // Filter by tag key/value. + if cmd.tagKeyFilter != nil || cmd.tagValueFilter != nil { + var matched bool + for _, tag := range tags { + if (cmd.tagKeyFilter == nil || cmd.tagKeyFilter.Match(tag.Key)) && (cmd.tagValueFilter == nil || cmd.tagValueFilter.Match(tag.Value)) { + matched = true + break + } + } + if !matched { + return false + } + } + + return true +} + +// printUsage prints the usage message to STDERR. +func (cmd *Command) printUsage() { + usage := `Dumps low-level details about tsi1 files. + +Usage: influx_inspect dumptsi [flags] path... + + -series + Dump raw series data + -measurements + Dump raw measurement data + -tag-keys + Dump raw tag keys + -tag-values + Dump raw tag values + -tag-value-series + Dump raw series for each tag value + -measurement-filter REGEXP + Filters data by measurement regular expression + -tag-key-filter REGEXP + Filters data by tag key regular expression + -tag-value-filter REGEXP + Filters data by tag value regular expression + +If no flags are specified then summary stats are provided for each file. +` + + fmt.Fprintf(cmd.Stdout, usage) +} + +// deletedString returns "(deleted)" if v is true. +func deletedString(v bool) string { + if v { + return "(deleted)" + } + return "" +} + +func formatSize(v uint64) string { + denom := uint64(1) + var uom string + for _, uom = range []string{"b", "kb", "mb", "gb", "tb"} { + if denom*1024 > v { + break + } + denom *= 1024 + } + return fmt.Sprintf("%0.01f%s", float64(v)/float64(denom), uom) +} diff --git a/cmd/influx_inspect/main.go b/cmd/influx_inspect/main.go index 02406a9f512..8f2434ed595 100644 --- a/cmd/influx_inspect/main.go +++ b/cmd/influx_inspect/main.go @@ -8,6 +8,7 @@ import ( "os" "github.com/influxdata/influxdb/cmd" + "github.com/influxdata/influxdb/cmd/influx_inspect/dumptsi" "github.com/influxdata/influxdb/cmd/influx_inspect/dumptsm" "github.com/influxdata/influxdb/cmd/influx_inspect/export" "github.com/influxdata/influxdb/cmd/influx_inspect/help" @@ -53,6 +54,11 @@ func (m *Main) Run(args ...string) error { if err := help.NewCommand().Run(args...); err != nil { return fmt.Errorf("help: %s", err) } + case "dumptsi": + name := dumptsi.NewCommand() + if err := name.Run(args...); err != nil { + return fmt.Errorf("dumptsi: %s", err) + } case "dumptsmdev": fmt.Fprintf(m.Stderr, "warning: dumptsmdev is deprecated, use dumptsm instead.\n") fallthrough diff --git a/tsdb/index/tsi1/file_set.go b/tsdb/index/tsi1/file_set.go index 8482ee5b0d3..05958830c16 100644 --- a/tsdb/index/tsi1/file_set.go +++ b/tsdb/index/tsi1/file_set.go @@ -17,6 +17,17 @@ import ( // FileSet represents a collection of files. type FileSet []File +// Close closes all the files in the file set. +func (p FileSet) Close() error { + var err error + for _, f := range p { + if e := f.Close(); e != nil && err == nil { + err = e + } + } + return err +} + // Retain adds a reference count to all files. func (p FileSet) Retain() { for _, f := range p { diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index f0b35c9f102..1f7644f48da 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -83,6 +83,7 @@ type Index struct { CompactionFactor float64 // Frequency of compaction checks. + CompactionEnabled bool CompactionMonitorInterval time.Duration } @@ -92,8 +93,9 @@ func NewIndex() *Index { closing: make(chan struct{}), // Default compaction thresholds. - MaxLogFileSize: DefaultMaxLogFileSize, - CompactionFactor: DefaultCompactionFactor, + MaxLogFileSize: DefaultMaxLogFileSize, + CompactionEnabled: true, + CompactionFactor: DefaultCompactionFactor, } } @@ -753,6 +755,10 @@ func (i *Index) Compact() { // compact compacts continguous groups of files that are not currently compacting. func (i *Index) compact() { + if !i.CompactionEnabled { + return + } + fs := i.retainFileSet() defer fs.Release() diff --git a/tsdb/index/tsi1/index_file.go b/tsdb/index/tsi1/index_file.go index 05540eb92c5..6c6723b44e0 100644 --- a/tsdb/index/tsi1/index_file.go +++ b/tsdb/index/tsi1/index_file.go @@ -195,6 +195,15 @@ func (f *IndexFile) Measurement(name []byte) MeasurementElem { return &e } +// MeasurementN returns the number of measurements in the file. +func (f *IndexFile) MeasurementN() (n uint64) { + mitr := f.mblk.Iterator() + for me := mitr.Next(); me != nil; me = mitr.Next() { + n++ + } + return n +} + // TagValueIterator returns a value iterator for a tag key and a flag // indicating if a tombstone exists on the measurement or key. func (f *IndexFile) TagValueIterator(name, key []byte) TagValueIterator { diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index 68d8ff95cfc..4198f2dd35a 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -372,6 +372,35 @@ func (f *LogFile) TagValueSeriesIterator(name, key, value []byte) SeriesIterator return newLogSeriesIterator(tv.series) } +// MeasurementN returns the total number of measurements. +func (f *LogFile) MeasurementN() (n uint64) { + f.mu.RLock() + defer f.mu.RUnlock() + return uint64(len(f.mms)) +} + +// TagKeyN returns the total number of keys. +func (f *LogFile) TagKeyN() (n uint64) { + f.mu.RLock() + defer f.mu.RUnlock() + for _, mm := range f.mms { + n += uint64(len(mm.tagSet)) + } + return n +} + +// TagValueN returns the total number of values. +func (f *LogFile) TagValueN() (n uint64) { + f.mu.RLock() + defer f.mu.RUnlock() + for _, mm := range f.mms { + for _, k := range mm.tagSet { + n += uint64(len(k.tagValues)) + } + } + return n +} + // DeleteTagValue adds a tombstone for a tag value to the log file. func (f *LogFile) DeleteTagValue(name, key, value []byte) error { f.mu.Lock() diff --git a/tsdb/index/tsi1/measurement_block.go b/tsdb/index/tsi1/measurement_block.go index 5e26d1ff9b2..205b5620fca 100644 --- a/tsdb/index/tsi1/measurement_block.go +++ b/tsdb/index/tsi1/measurement_block.go @@ -321,6 +321,12 @@ func (e *MeasurementBlockElem) TagBlockOffset() int64 { return e.tagBlock.offset // TagBlockSize returns the size of the measurement's tag block. func (e *MeasurementBlockElem) TagBlockSize() int64 { return e.tagBlock.size } +// SeriesData returns the raw series data. +func (e *MeasurementBlockElem) SeriesData() []byte { return e.series.data } + +// SeriesN returns the number of series associated with the measurement. +func (e *MeasurementBlockElem) SeriesN() uint64 { return e.series.n } + // SeriesID returns series ID at an index. func (e *MeasurementBlockElem) SeriesID(i int) uint64 { return binary.BigEndian.Uint64(e.series.data[i*SeriesIDSize:]) @@ -335,6 +341,9 @@ func (e *MeasurementBlockElem) SeriesIDs() []uint64 { return a } +// Size returns the size of the element. +func (e *MeasurementBlockElem) Size() int { return e.size } + // UnmarshalBinary unmarshals data into e. func (e *MeasurementBlockElem) UnmarshalBinary(data []byte) error { start := len(data) diff --git a/tsdb/index/tsi1/tag_block.go b/tsdb/index/tsi1/tag_block.go index df23855fc62..b22b8a19333 100644 --- a/tsdb/index/tsi1/tag_block.go +++ b/tsdb/index/tsi1/tag_block.go @@ -316,6 +316,9 @@ func (e *TagBlockValueElem) Value() []byte { return e.value } // SeriesN returns the series count. func (e *TagBlockValueElem) SeriesN() uint64 { return e.series.n } +// SeriesData returns the raw series data. +func (e *TagBlockValueElem) SeriesData() []byte { return e.series.data } + // SeriesID returns series ID at an index. func (e *TagBlockValueElem) SeriesID(i int) uint64 { return binary.BigEndian.Uint64(e.series.data[i*SeriesIDSize:]) @@ -330,6 +333,9 @@ func (e *TagBlockValueElem) SeriesIDs() []uint64 { return a } +// Size returns the size of the element. +func (e *TagBlockValueElem) Size() int { return e.size } + // unmarshal unmarshals buf into e. func (e *TagBlockValueElem) unmarshal(buf []byte) { start := len(buf)