From c2eac861318a6328e987ed99529ce902a19440cb Mon Sep 17 00:00:00 2001 From: Jeffrey Smith II Date: Mon, 21 Nov 2022 11:23:13 -0500 Subject: [PATCH] feat: port report-db command from 1.x (#23922) * feat: port report-db command from 1.x * chore: fix linting * chore: rename db to bucket * chore: fix linting --- cmd/influxd/inspect/inspect.go | 7 + .../report_db/aggregators/aggregators.go | 242 +++++++++++++ .../report_db/aggregators/aggregators_test.go | 330 ++++++++++++++++++ cmd/influxd/inspect/report_db/report_db.go | 189 ++++++++++ cmd/influxd/inspect/report_tsm/report_tsm.go | 36 +- pkg/reporthelper/walkshards.go | 71 ++++ 6 files changed, 857 insertions(+), 18 deletions(-) create mode 100644 cmd/influxd/inspect/report_db/aggregators/aggregators.go create mode 100644 cmd/influxd/inspect/report_db/aggregators/aggregators_test.go create mode 100644 cmd/influxd/inspect/report_db/report_db.go create mode 100644 pkg/reporthelper/walkshards.go diff --git a/cmd/influxd/inspect/inspect.go b/cmd/influxd/inspect/inspect.go index fb2f9ee8c9c..43398b8676d 100644 --- a/cmd/influxd/inspect/inspect.go +++ b/cmd/influxd/inspect/inspect.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/dump_wal" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_index" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_lp" + "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_db" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_tsi" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_tsm" typecheck "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/type_conflicts" @@ -35,6 +36,11 @@ func NewCommand(v *viper.Viper) (*cobra.Command, error) { return nil, err } + reportDB, err := report_db.NewReportDBCommand(v) + if err != nil { + return nil, err + } + checkSchema, err := typecheck.NewCheckSchemaCommand(v) if err != nil { return nil, err @@ -58,6 +64,7 @@ func NewCommand(v *viper.Viper) (*cobra.Command, error) { base.AddCommand(verify_wal.NewVerifyWALCommand()) base.AddCommand(report_tsm.NewReportTSMCommand()) base.AddCommand(build_tsi.NewBuildTSICommand()) + base.AddCommand(reportDB) base.AddCommand(checkSchema) base.AddCommand(mergeSchema) diff --git a/cmd/influxd/inspect/report_db/aggregators/aggregators.go b/cmd/influxd/inspect/report_db/aggregators/aggregators.go new file mode 100644 index 00000000000..4040737d8be --- /dev/null +++ b/cmd/influxd/inspect/report_db/aggregators/aggregators.go @@ -0,0 +1,242 @@ +package aggregators + +import ( + "fmt" + "strings" + "sync" + "text/tabwriter" + + report "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_tsm" + "github.com/influxdata/influxdb/v2/models" +) + +type rollupNodeMap map[string]RollupNode + +type RollupNode interface { + sync.Locker + report.Counter + Children() rollupNodeMap + RecordSeries(bucket, rp, ms string, key, field []byte, tags models.Tags) + Print(tw *tabwriter.Writer, printTags bool, bucket, rp, ms string) error + isLeaf() bool + child(key string, isLeaf bool) NodeWrapper +} + +type NodeWrapper struct { + RollupNode +} + +var detailedHeader = []string{"bucket", "retention policy", "measurement", "series", "fields", "tag total", "tags"} +var simpleHeader = []string{"bucket", "retention policy", "measurement", "series"} + +type RollupNodeFactory struct { + header []string + EstTitle string + NewNode func(isLeaf bool) NodeWrapper + counter func() report.Counter +} + +var nodeFactory *RollupNodeFactory + +func CreateNodeFactory(detailed, exact bool) *RollupNodeFactory { + estTitle := " (est.)" + newCounterFn := report.NewHLLCounter + if exact { + newCounterFn = report.NewExactCounter + estTitle = "" + } + + if detailed { + nodeFactory = newDetailedNodeFactory(newCounterFn, estTitle) + } else { + nodeFactory = newSimpleNodeFactory(newCounterFn, estTitle) + } + return nodeFactory +} + +func (f *RollupNodeFactory) PrintHeader(tw *tabwriter.Writer) error { + _, err := fmt.Fprintln(tw, strings.Join(f.header, "\t")) + return err +} + +func (f *RollupNodeFactory) PrintDivider(tw *tabwriter.Writer) error { + divLine := f.makeTabDivider() + _, err := fmt.Fprintln(tw, divLine) + return err +} + +func (f *RollupNodeFactory) makeTabDivider() string { + div := make([]string, 0, len(f.header)) + for _, s := range f.header { + div = append(div, strings.Repeat("-", len(s))) + } + return strings.Join(div, "\t") +} + +func newSimpleNodeFactory(newCounterFn func() report.Counter, est string) *RollupNodeFactory { + return &RollupNodeFactory{ + header: simpleHeader, + EstTitle: est, + NewNode: func(isLeaf bool) NodeWrapper { return NodeWrapper{newSimpleNode(isLeaf, newCounterFn)} }, + counter: newCounterFn, + } +} + +func newDetailedNodeFactory(newCounterFn func() report.Counter, est string) *RollupNodeFactory { + return &RollupNodeFactory{ + header: detailedHeader, + EstTitle: est, + NewNode: func(isLeaf bool) NodeWrapper { return NodeWrapper{newDetailedNode(isLeaf, newCounterFn)} }, + counter: newCounterFn, + } +} + +type simpleNode struct { + sync.Mutex + report.Counter + rollupNodeMap +} + +func (s *simpleNode) Children() rollupNodeMap { + return s.rollupNodeMap +} + +func (s *simpleNode) child(key string, isLeaf bool) NodeWrapper { + if s.isLeaf() { + panic("Trying to get the child to a leaf node") + } + s.Lock() + defer s.Unlock() + c, ok := s.Children()[key] + if !ok { + c = nodeFactory.NewNode(isLeaf) + s.Children()[key] = c + } + return NodeWrapper{c} +} + +func (s *simpleNode) isLeaf() bool { + return s.Children() == nil +} + +func newSimpleNode(isLeaf bool, fn func() report.Counter) *simpleNode { + s := &simpleNode{Counter: fn()} + if !isLeaf { + s.rollupNodeMap = make(rollupNodeMap) + } else { + s.rollupNodeMap = nil + } + return s +} + +func (s *simpleNode) RecordSeries(bucket, rp, _ string, key, _ []byte, _ models.Tags) { + s.Lock() + defer s.Unlock() + s.recordSeriesNoLock(bucket, rp, key) +} + +func (s *simpleNode) recordSeriesNoLock(bucket, rp string, key []byte) { + s.Add([]byte(fmt.Sprintf("%s.%s.%s", bucket, rp, key))) +} + +func (s *simpleNode) Print(tw *tabwriter.Writer, _ bool, bucket, rp, ms string) error { + _, err := fmt.Fprintf(tw, "%s\t%s\t%s\t%d\n", + bucket, + rp, + ms, + s.Count()) + return err +} + +type detailedNode struct { + simpleNode + fields report.Counter + tags map[string]report.Counter +} + +func newDetailedNode(isLeaf bool, fn func() report.Counter) *detailedNode { + d := &detailedNode{ + simpleNode: simpleNode{ + Counter: fn(), + }, + fields: fn(), + tags: make(map[string]report.Counter), + } + if !isLeaf { + d.simpleNode.rollupNodeMap = make(rollupNodeMap) + } else { + d.simpleNode.rollupNodeMap = nil + } + return d +} + +func (d *detailedNode) RecordSeries(bucket, rp, ms string, key, field []byte, tags models.Tags) { + d.Lock() + defer d.Unlock() + d.simpleNode.recordSeriesNoLock(bucket, rp, key) + d.fields.Add([]byte(fmt.Sprintf("%s.%s.%s.%s", bucket, rp, ms, field))) + for _, t := range tags { + // Add database, retention policy, and measurement + // to correctly aggregate in inner (non-leaf) nodes + canonTag := fmt.Sprintf("%s.%s.%s.%s", bucket, rp, ms, t.Key) + tc, ok := d.tags[canonTag] + if !ok { + tc = nodeFactory.counter() + d.tags[canonTag] = tc + } + tc.Add(t.Value) + } +} + +func (d *detailedNode) Print(tw *tabwriter.Writer, printTags bool, bucket, rp, ms string) error { + seriesN := d.Count() + fieldsN := d.fields.Count() + var tagKeys []string + tagN := uint64(0) + + if printTags { + tagKeys = make([]string, 0, len(d.tags)) + } + for k, v := range d.tags { + c := v.Count() + tagN += c + if printTags { + tagKeys = append(tagKeys, fmt.Sprintf("%q: %d", k[strings.LastIndex(k, ".")+1:], c)) + } + } + _, err := fmt.Fprintf(tw, "%s\t%s\t%s\t%d\t%d\t%d\t%s\n", + bucket, + rp, + ms, + seriesN, + fieldsN, + tagN, + strings.Join(tagKeys, ", ")) + return err +} + +func (r *NodeWrapper) Record(depth, totalDepth int, bucket, rp, measurement string, key []byte, field []byte, tags models.Tags) { + r.RecordSeries(bucket, rp, measurement, key, field, tags) + + switch depth { + case 2: + if depth < totalDepth { + // Create measurement level in tree + c := r.child(measurement, true) + c.RecordSeries(bucket, rp, measurement, key, field, tags) + } + case 1: + if depth < totalDepth { + // Create retention policy level in tree + c := r.child(rp, (depth+1) == totalDepth) + c.Record(depth+1, totalDepth, bucket, rp, measurement, key, field, tags) + } + case 0: + if depth < totalDepth { + // Create database level in tree + c := r.child(bucket, (depth+1) == totalDepth) + c.Record(depth+1, totalDepth, bucket, rp, measurement, key, field, tags) + } + default: + } +} diff --git a/cmd/influxd/inspect/report_db/aggregators/aggregators_test.go b/cmd/influxd/inspect/report_db/aggregators/aggregators_test.go new file mode 100644 index 00000000000..0cb264228db --- /dev/null +++ b/cmd/influxd/inspect/report_db/aggregators/aggregators_test.go @@ -0,0 +1,330 @@ +package aggregators + +import ( + "bytes" + "sync" + "testing" + + "github.com/influxdata/influxdb/v2/models" + "github.com/stretchr/testify/require" +) + +type result struct { + fields uint64 + tags uint64 + series uint64 +} + +type test struct { + db string + rp string + key []byte +} + +// Ensure that tags and fields and series which differ only in database, retention policy, or measurement +// are correctly counted. +func Test_canonicalize(t *testing.T) { + totalDepth := 3 + + // measurement,tag1=tag1_value1,tag2=tag2_value1#!~#field1 + tests := []test{ + { + db: "db1", + rp: "rp1", + key: []byte("m1,t1=t1_v1,t2=t2_v1#!~#f1"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m1,t1=t1_v2,t2=t2_v1#!~#f1"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m1,t1=t1_v1,t2=t2_v2#!~#f1"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f1"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f2"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m1,t1=t1_v1,t2=t2_v1#!~#f1"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m1,t1=t1_v2,t2=t2_v1#!~#f1"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m1,t1=t1_v1,t2=t2_v2#!~#f1"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f3"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f2"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m2,t1=t1_v1,t2=t2_v1#!~#f1"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m2,t1=t1_v2,t2=t2_v1#!~#f1"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m2,t1=t1_v1,t2=t2_v2#!~#f1"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f1"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f2"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m2,t1=t1_v1,t2=t2_v1#!~#f1"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m2,t1=t1_v2,t2=t2_v1#!~#f1"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m2,t1=t1_v1,t2=t2_v2#!~#f1"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f1"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f2"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m1,t1=t1_v1,t2=t2_v1#!~#f1"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m1,t1=t1_v2,t2=t2_v1#!~#f1"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m1,t1=t1_v1,t2=t2_v2#!~#f1"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f1"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f2"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m1,t1=t1_v1,t2=t2_v1#!~#f1"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m1,t1=t1_v2,t2=t2_v1#!~#f1"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m1,t1=t1_v1,t2=t2_v2#!~#f1"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f1"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f2"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m2,t1=t1_v1,t2=t2_v1#!~#f1"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m2,t1=t1_v2,t2=t2_v1#!~#f1"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m2,t1=t1_v1,t2=t2_v2#!~#f1"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f1"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f2"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m2,t1=t1_v1,t2=t2_v1#!~#f1"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m2,t1=t1_v2,t2=t2_v1#!~#f1"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m2,t1=t1_v1,t2=t2_v2#!~#f1"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f1"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f2"), + }, + } + + results := map[string]map[string]map[string]*result{ + "db1": { + "rp1": { + "m1": {2, 4, 5}, + "m2": {2, 4, 5}, + "": {4, 8, 10}, + }, + "rp2": { + "m1": {3, 4, 5}, + "m2": {2, 4, 5}, + "": {5, 8, 10}, + }, + "": { + "": {9, 16, 20}, + }, + }, + "db2": { + "rp1": { + "m1": {2, 4, 5}, + "m2": {2, 4, 5}, + "": {4, 8, 10}, + }, + "rp2": { + "m1": {2, 4, 5}, + "m2": {2, 4, 5}, + "": {4, 8, 10}, + }, + "": { + "": {8, 16, 20}, + }, + }, + "": { + "": { + "": {17, 32, 40}, + }, + }, + } + + testLoop(t, false, true, totalDepth, tests, results) + testLoop(t, true, true, totalDepth, tests, results) + testLoop(t, false, false, totalDepth, tests, results) + testLoop(t, true, false, totalDepth, tests, results) + +} + +func testLoop(t *testing.T, detailed bool, exact bool, totalDepth int, tests []test, results map[string]map[string]map[string]*result) { + factory := CreateNodeFactory(detailed, exact) + tree := factory.NewNode(totalDepth == 0) + + wg := sync.WaitGroup{} + tf := func() { + for i := range tests { + seriesKey, field, _ := bytes.Cut(tests[i].key, []byte("#!~#")) + measurement, tags := models.ParseKey(seriesKey) + tree.Record(0, totalDepth, tests[i].db, tests[i].rp, measurement, tests[i].key, field, tags) + } + wg.Done() + } + const concurrency = 5 + wg.Add(concurrency) + for j := 0; j < concurrency; j++ { + go tf() + } + wg.Wait() + + for d, db := range tree.Children() { + for r, rp := range db.Children() { + for m, measure := range rp.Children() { + checkNode(t, measure, results[d][r][m], d, r, m) + } + checkNode(t, rp, results[d][r][""], d, r, "") + } + checkNode(t, db, results[d][""][""], d, "", "") + } + checkNode(t, tree, results[""][""][""], "", "", "") +} + +func checkNode(t *testing.T, measure RollupNode, results *result, d string, r string, m string) { + mr, ok := measure.(NodeWrapper) + if !ok { + t.Fatalf("internal error: expected a NodeWrapper type") + } + + switch node := mr.RollupNode.(type) { + case *detailedNode: + require.Equalf(t, results.series, node.Count(), "series count wrong. db: %q, rp: %q, ms: %q", d, r, m) + require.Equalf(t, results.fields, node.fields.Count(), "field count wrong. db: %q, rp: %q, ms: %q", d, r, m) + tagSum := uint64(0) + for _, t := range node.tags { + tagSum += t.Count() + } + require.Equalf(t, results.tags, tagSum, "tag value count wrong. db: %q, rp: %q, ms: %q", d, r, m) + case *simpleNode: + require.Equalf(t, results.series, node.Count(), "series count wrong. db: %q, rp: %q, ms: %q", d, r, m) + default: + t.Fatalf("internal error: unknown node type") + } +} diff --git a/cmd/influxd/inspect/report_db/report_db.go b/cmd/influxd/inspect/report_db/report_db.go new file mode 100644 index 00000000000..30bdd42480c --- /dev/null +++ b/cmd/influxd/inspect/report_db/report_db.go @@ -0,0 +1,189 @@ +package report_db + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "os" + "text/tabwriter" + + "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_db/aggregators" + "github.com/influxdata/influxdb/v2/kit/cli" + "github.com/influxdata/influxdb/v2/models" + "github.com/influxdata/influxdb/v2/pkg/reporthelper" + "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "golang.org/x/sync/errgroup" +) + +// ReportDB represents the program execution for "influxd report-db". +type ReportDB struct { + // Standard input/output, overridden for testing. + Stderr io.Writer + Stdout io.Writer + + dbPath string + exact bool + detailed bool + // How many goroutines to dedicate to calculating cardinality. + concurrency int + // t, d, r, m for Total, Database, Retention Policy, Measurement + rollup string +} + +func NewReportDBCommand(v *viper.Viper) (*cobra.Command, error) { + flags := &ReportDB{ + Stderr: os.Stderr, + Stdout: os.Stdout, + } + + cmd := &cobra.Command{ + Use: "report-db", + Short: "Estimates cloud 2 cardinality for a database", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, _ []string) error { + return reportDBRunE(cmd, flags) + }, + } + opts := []cli.Opt{ + { + DestP: &flags.dbPath, + Flag: "db-path", + Desc: "path to database", + Required: true, + }, + { + DestP: &flags.concurrency, + Flag: "c", + Desc: "set worker concurrency, defaults to one", + Default: 1, + }, + { + DestP: &flags.detailed, + Flag: "detailed", + Desc: "include counts for fields, tags", + Default: false, + }, + { + DestP: &flags.exact, + Flag: "exact", + Desc: "report exact counts", + Default: false, + }, + { + DestP: &flags.rollup, + Flag: "rollup", + Desc: "rollup level - t: total, b: bucket, r: retention policy, m: measurement", + Default: "m", + }, + } + if err := cli.BindOptions(v, cmd, opts); err != nil { + return nil, err + } + return cmd, nil +} + +func reportDBRunE(_ *cobra.Command, reportdb *ReportDB) error { + var legalRollups = map[string]int{"m": 3, "r": 2, "b": 1, "t": 0} + if reportdb.dbPath == "" { + return errors.New("path to database must be provided") + } + + totalDepth, ok := legalRollups[reportdb.rollup] + + if !ok { + return fmt.Errorf("invalid rollup specified: %q", reportdb.rollup) + } + + factory := aggregators.CreateNodeFactory(reportdb.detailed, reportdb.exact) + totalsTree := factory.NewNode(totalDepth == 0) + + g, ctx := errgroup.WithContext(context.Background()) + g.SetLimit(reportdb.concurrency) + processTSM := func(bucket, rp, id, path string) error { + file, err := os.OpenFile(path, os.O_RDONLY, 0600) + if err != nil { + _, _ = fmt.Fprintf(reportdb.Stderr, "error: %s: %v. Skipping.\n", path, err) + return nil + } + + reader, err := tsm1.NewTSMReader(file) + if err != nil { + _, _ = fmt.Fprintf(reportdb.Stderr, "error: %s: %v. Skipping.\n", file.Name(), err) + // NewTSMReader won't close the file handle on failure, so do it here. + _ = file.Close() + return nil + } + defer func() { + // The TSMReader will close the underlying file handle here. + if err := reader.Close(); err != nil { + _, _ = fmt.Fprintf(reportdb.Stderr, "error closing: %s: %v.\n", file.Name(), err) + } + }() + + seriesCount := reader.KeyCount() + for i := 0; i < seriesCount; i++ { + func() { + key, _ := reader.KeyAt(i) + seriesKey, field, _ := bytes.Cut(key, []byte("#!~#")) + measurement, tags := models.ParseKey(seriesKey) + totalsTree.Record(0, totalDepth, bucket, rp, measurement, key, field, tags) + }() + } + return nil + } + done := ctx.Done() + err := reporthelper.WalkShardDirs(reportdb.dbPath, func(bucket, rp, id, path string) error { + select { + case <-done: + return nil + default: + g.Go(func() error { + return processTSM(bucket, rp, id, path) + }) + return nil + } + }) + + if err != nil { + _, _ = fmt.Fprintf(reportdb.Stderr, "%s: %v\n", reportdb.dbPath, err) + return err + } + err = g.Wait() + if err != nil { + _, _ = fmt.Fprintf(reportdb.Stderr, "%s: %v\n", reportdb.dbPath, err) + return err + } + + tw := tabwriter.NewWriter(reportdb.Stdout, 8, 2, 1, ' ', 0) + + if err = factory.PrintHeader(tw); err != nil { + return err + } + if err = factory.PrintDivider(tw); err != nil { + return err + } + for d, bucket := range totalsTree.Children() { + for r, rp := range bucket.Children() { + for m, measure := range rp.Children() { + err = measure.Print(tw, true, fmt.Sprintf("%q", d), fmt.Sprintf("%q", r), fmt.Sprintf("%q", m)) + if err != nil { + return err + } + } + if err = rp.Print(tw, false, fmt.Sprintf("%q", d), fmt.Sprintf("%q", r), ""); err != nil { + return err + } + } + if err = bucket.Print(tw, false, fmt.Sprintf("%q", d), "", ""); err != nil { + return err + } + } + if err = totalsTree.Print(tw, false, "Total"+factory.EstTitle, "", ""); err != nil { + return err + } + return tw.Flush() +} diff --git a/cmd/influxd/inspect/report_tsm/report_tsm.go b/cmd/influxd/inspect/report_tsm/report_tsm.go index e6549d41c97..5f86e56de78 100644 --- a/cmd/influxd/inspect/report_tsm/report_tsm.go +++ b/cmd/influxd/inspect/report_tsm/report_tsm.go @@ -91,20 +91,20 @@ func (a *args) isShardDir(dir string) error { } func (a *args) Run(cmd *cobra.Command) error { - // Create the cardinality counter - newCounterFn := newHLLCounter + // Create the cardinality Counter + newCounterFn := NewHLLCounter estTitle := " (est)" if a.exact { estTitle = "" - newCounterFn = newExactCounter + newCounterFn = NewExactCounter } totalSeries := newCounterFn() - tagCardinalities := map[string]counter{} - measCardinalities := map[string]counter{} - fieldCardinalities := map[string]counter{} + tagCardinalities := map[string]Counter{} + measCardinalities := map[string]Counter{} + fieldCardinalities := map[string]Counter{} - dbCardinalities := map[string]counter{} + dbCardinalities := map[string]Counter{} start := time.Now() @@ -233,13 +233,13 @@ type printArgs struct { fileCount int minTime, maxTime int64 estTitle string - totalSeries counter + totalSeries Counter detailed bool - tagCardinalities map[string]counter - measCardinalities map[string]counter - fieldCardinalities map[string]counter - dbCardinalities map[string]counter + tagCardinalities map[string]Counter + measCardinalities map[string]Counter + fieldCardinalities map[string]Counter + dbCardinalities map[string]Counter } func printSummary(cmd *cobra.Command, p printArgs) { @@ -277,7 +277,7 @@ func printSummary(cmd *cobra.Command, p printArgs) { } // sortKeys is a quick helper to return the sorted set of a map's keys -func sortKeys(vals map[string]counter) (keys []string) { +func sortKeys(vals map[string]Counter) (keys []string) { for k := range vals { keys = append(keys, k) } @@ -335,14 +335,14 @@ func (a *args) walkShardDirs(root string, fn func(db, rp, id, path string) error return nil } -// counter abstracts a method of counting keys. -type counter interface { +// Counter abstracts a method of counting keys. +type Counter interface { Add(key []byte) Count() uint64 } -// newHLLCounter returns an approximate counter using HyperLogLogs for cardinality estimation. -func newHLLCounter() counter { +// NewHLLCounter returns an approximate Counter using HyperLogLogs for cardinality estimation. +func NewHLLCounter() Counter { return hllpp.New() } @@ -359,7 +359,7 @@ func (c *exactCounter) Count() uint64 { return uint64(len(c.m)) } -func newExactCounter() counter { +func NewExactCounter() Counter { return &exactCounter{ m: make(map[string]struct{}), } diff --git a/pkg/reporthelper/walkshards.go b/pkg/reporthelper/walkshards.go new file mode 100644 index 00000000000..3a745fce37e --- /dev/null +++ b/pkg/reporthelper/walkshards.go @@ -0,0 +1,71 @@ +// Package reporthelper reports statistics about TSM files. +package reporthelper + +import ( + "fmt" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + + "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" +) + +func IsShardDir(dir string) error { + name := filepath.Base(dir) + if id, err := strconv.Atoi(name); err != nil || id < 1 { + return fmt.Errorf("not a valid shard dir: %v", dir) + } + + return nil +} + +func WalkShardDirs(root string, fn func(db, rp, id, path string) error) error { + type location struct { + db, rp, id, path string + } + + var dirs []location + if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if info.IsDir() { + return nil + } + + if filepath.Ext(info.Name()) == "."+tsm1.TSMFileExtension { + shardDir := filepath.Dir(path) + + if err := IsShardDir(shardDir); err != nil { + return err + } + absPath, err := filepath.Abs(path) + if err != nil { + return err + } + parts := strings.Split(absPath, string(filepath.Separator)) + db, rp, id := parts[len(parts)-4], parts[len(parts)-3], parts[len(parts)-2] + dirs = append(dirs, location{db: db, rp: rp, id: id, path: path}) + return nil + } + return nil + }); err != nil { + return err + } + + sort.Slice(dirs, func(i, j int) bool { + a, _ := strconv.Atoi(dirs[i].id) + b, _ := strconv.Atoi(dirs[j].id) + return a < b + }) + + for _, shard := range dirs { + if err := fn(shard.db, shard.rp, shard.id, shard.path); err != nil { + return err + } + } + return nil +}