diff --git a/Makefile b/Makefile index f9fe4b79..61347627 100644 --- a/Makefile +++ b/Makefile @@ -93,7 +93,10 @@ cherami-cassandra-tool: $(DEPS) cherami-store-tool: $(DEPS) go build -i $(EMBED) -o cherami-store-tool cmd/tools/store/main.go -bins: cherami-server cherami-replicator-server cherami-cli cherami-admin cherami-replicator-tool cherami-cassandra-tool cherami-store-tool +cdb: $(DEPS) + go build -i $(EMBED) -o cdb cmd/tools/cdb/*.go + +bins: cherami-server cherami-replicator-server cherami-cli cherami-admin cherami-replicator-tool cherami-cassandra-tool cherami-store-tool cdb cover_profile: lint bins @mkdir -p $(BUILD) diff --git a/cmd/tools/cdb/main.go b/cmd/tools/cdb/main.go new file mode 100644 index 00000000..10d7d441 --- /dev/null +++ b/cmd/tools/cdb/main.go @@ -0,0 +1,225 @@ +package main + +import ( + "flag" + "fmt" + "math" + "os" + "path/filepath" + "strconv" + "time" + + "github.com/uber/cherami-server/storage" + "github.com/uber/cherami-server/storage/manyrocks" +) + +type arguments struct { + store string + extent string + baseDir string + start storage.Key + end storage.Key + num int + printVal bool + formatTime bool + help bool +} + +func main() { + + args := parseArgs() + + if args == nil { + return + } + + var db storage.ExtentStore + var err error + + switch args.store { + case "manyrocks": + db, err = manyrocks.OpenExtentDB(storage.ExtentUUID(args.extent), fmt.Sprintf("%s/%s", args.baseDir, args.extent)) + if err != nil { + fmt.Printf("error opening db (%s): %v\n", args.baseDir, err) + return + } + + default: + fmt.Printf("unsupported store: %s\n", args.store) + return + } + + defer db.CloseExtentDB() + + dumpExtentDB(db, args) + + return +} + +func dumpExtentDB(db storage.ExtentStore, args *arguments) { + + addr, key, err := db.SeekCeiling(args.start) + + if err != nil { + fmt.Printf("db.SeekCeiling(%x) error: %v\n", args.start, err) + return + } + + var val storage.Value + + if args.printVal { + + _, val, _, _, err = db.Get(addr) + + if err != nil { + fmt.Printf("db.Get(%x) errored: %v\n", addr, err) + return + } + } + + var num int + for (addr != storage.EOX) && (key < args.end) && (num < args.num) { + + // fmt.Printf("key = %d %x %v %p\n", key, key, key, key) + + if isSealExtentKey(key) { + + sealSeqNum := deconstructSealExtentKey(key) + + if sealSeqNum == seqNumUnspecifiedSeal { + fmt.Printf("0x%016v => SEALED (seqnum: unspecified)\n", key) + } else { + fmt.Printf("0x%016v => SEALED (seqnum: %d)\n", key, sealSeqNum) + } + + } else { + + ts, seqnum := deconstructKey(key) + + if args.printVal { + + var enqTime, payload, vTime string + + msg, errd := deserializeMessage(val) + enq := msg.GetEnqueueTimeUtc() + + if errd != nil { // corrupt message? + + payload = fmt.Sprintf("ERROR deserializing data: %v (val=%v)", errd, val) + + } else { + + if args.formatTime { + enqTime = time.Unix(0, enq).Format(time.RFC3339Nano) + } else { + enqTime = strconv.FormatInt(enq, 16) + } + + payload = fmt.Sprintf("seq=%d enq=%v data=%d bytes", + msg.GetSequenceNumber(), enqTime, len(msg.GetPayload().GetData())) + // payload = msg.String() + } + + if args.formatTime { + vTime = time.Unix(0, ts).Format(time.RFC3339Nano) + } else { + vTime = strconv.FormatInt(ts, 16) + } + + fmt.Printf("0x%016v => #%d ts=%v payload:[%v]\n", key, seqnum, vTime, payload) + + } else { + + fmt.Printf("0x%016v => #%d ts=%v\n", key, seqnum, ts) + } + } + + num++ + + if args.printVal { + + if key, val, addr, _, err = db.Get(addr); err != nil { + fmt.Printf("db.Get(%x) errored: %v\n", addr, err) + break + } + + } else { + + if addr, key, err = db.Next(addr); err != nil { + fmt.Printf("db.Next(%x) errored: %v\n", addr, err) + break + } + } + } + + fmt.Printf("summary: dumped %d keys in range [%v, %v)\n", num, args.start, args.end) + return +} + +func parseArgs() (args *arguments) { + + args = &arguments{} + + flag.StringVar(&args.store, "store", "manyrocks", "store") + flag.StringVar(&args.extent, "x", "", "extent") + flag.StringVar(&args.baseDir, "base", ".", "base dir") + + var start, end, num string + + flag.StringVar(&start, "s", "-1", "start range") + flag.StringVar(&end, "e", "-1", "end range") + flag.StringVar(&num, "n", "-1", "number of values") + + flag.BoolVar(&args.printVal, "v", false, "deserialize payload") + flag.BoolVar(&args.formatTime, "t", false, "format time") + + flag.BoolVar(&args.help, "?", false, "help") + flag.BoolVar(&args.help, "help", false, "help") + + flag.Parse() + + switch { + case args.extent == "": + cwd, _ := os.Getwd() + + args.extent = filepath.Base(cwd) + args.baseDir = filepath.Dir(cwd) + + case args.baseDir == "": + args.baseDir, _ = os.Getwd() + } + + switch i, err := strconv.ParseInt(start, 0, 64); { + case err != nil: + fmt.Printf("error parsing start arg (%s): %v\n", start, err) + return nil + case i < 0: + args.start = 0 + default: + args.start = storage.Key(i) + } + + switch i, err := strconv.ParseInt(end, 0, 64); { + case err != nil: + fmt.Printf("error parsing end arg (%s): %v\n", end, err) + return nil + case i < 0: + args.end = storage.Key(math.MaxInt64) + default: + args.end = storage.Key(i) + } + + switch i, err := strconv.ParseInt(num, 0, 64); { + case err != nil: + fmt.Printf("error parsing num arg (%s): %v\n", num, err) + return nil + case i < 0: + args.num = math.MaxInt64 + default: + args.num = int(i) + } + + // fmt.Printf("args=%v\n", args) + + return args +} diff --git a/cmd/tools/cdb/utils.go b/cmd/tools/cdb/utils.go new file mode 100644 index 00000000..fe5f94b7 --- /dev/null +++ b/cmd/tools/cdb/utils.go @@ -0,0 +1,54 @@ +package main + +import ( + "math" + + "github.com/uber/cherami-server/storage" + "github.com/uber/cherami-thrift/.generated/go/store" + + "github.com/apache/thrift/lib/go/thrift" +) + +// -- decode message/address -- // +const ( + seqNumBits = 26 + + invalidKey = math.MaxInt64 + + seqNumBitmask = (int64(1) << seqNumBits) - 1 + timestampBitmask = math.MaxInt64 &^ seqNumBitmask + seqNumMax = int64(math.MaxInt64-2) & seqNumBitmask + + seqNumUnspecifiedSeal = int64(math.MaxInt64 - 1) +) + +func deconstructKey(key storage.Key) (visibilityTime int64, seqNum int64) { + return int64(int64(key) & timestampBitmask), int64(int64(key) & seqNumBitmask) +} + +func deconstructSealExtentKey(key storage.Key) (seqNum int64) { + + seqNum = int64(key) & seqNumBitmask + + // we use the special seqnum ('MaxInt64 - 1') when the extent has been sealed + // at an "unspecified" seqnum; check for this case, and return appropriate value + if seqNum == (seqNumBitmask - 1) { + seqNum = seqNumUnspecifiedSeal + } + + return +} + +func isSealExtentKey(key storage.Key) bool { + return key != storage.InvalidKey && (int64(key)×tampBitmask) == timestampBitmask +} + +func deserializeMessage(data []byte) (*store.AppendMessage, error) { + msg := &store.AppendMessage{} + deserializer := thrift.NewTDeserializer() + if err := deserializer.Read(msg, data); err != nil { + return nil, err + } + + return msg, nil +} diff --git a/storage/manyrocks/manyrocks.go b/storage/manyrocks/manyrocks.go index bebdc0a6..feae042f 100644 --- a/storage/manyrocks/manyrocks.go +++ b/storage/manyrocks/manyrocks.go @@ -35,6 +35,8 @@ import ( "github.com/uber-common/bark" "github.com/uber/cherami-server/common" s "github.com/uber/cherami-server/storage" + + "github.com/Sirupsen/logrus" ) const rocksdbNotExistError = "does not exist (create_if_missing is false)" @@ -115,6 +117,49 @@ func New(opts *Opts, log bark.Logger) (*ManyRocks, error) { }, nil } +// OpenExtentDB gets a handle to the raw extent DB +func OpenExtentDB(id s.ExtentUUID, path string) (*Rock, error) { + + // setup RocksDB options + opts := gorocksdb.NewDefaultOptions() + + opts.SetCreateIfMissing(false) + + db, err := gorocksdb.OpenDb(opts, path) + if err != nil { + return nil, err + } + + // setup read/write options used with IO + readOpts := gorocksdb.NewDefaultReadOptions() + + writeOpts := gorocksdb.NewDefaultWriteOptions() + + return &Rock{ + id: id, + path: path, + keyPattern: s.IncreasingKeys, + notify: func(key s.Key, addr s.Address) {}, + db: db, + opts: opts, + readOpts: readOpts, + writeOpts: writeOpts, + store: &ManyRocks{ + logger: bark.NewLoggerFromLogrus(logrus.StandardLogger()), + }, + }, nil +} + +// CloseExtentDB closes the handle to the raw extent DB +func (t *Rock) CloseExtentDB() { + + t.writeOpts.Destroy() + t.readOpts.Destroy() + t.db.Close() + t.opts.Destroy() + t.notify = nil +} + // getDBPath returns the base-dir to use for the DB func (t *ManyRocks) getDBPath(id s.ExtentUUID) string { return fmt.Sprintf("%s/%v", t.opts.BaseDir, id) // NB: 'baseDir' should already created diff --git a/storage/storage.go b/storage/storage.go index 80b7d6be..f6cff058 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -85,6 +85,9 @@ type StoreManager interface { GetExtentInfo(id ExtentUUID) (*ExtentInfo, error) } +// OpenExtentDB gets a handle to the raw extent DB +// func OpenExtentDB(id ExtentUUID, path string) (ExtentStore, error) + // -- EXTENT STORE -- // // The extent-store is designed and expected to be used concurrently by // at most one-writer and any number of readers. @@ -302,6 +305,14 @@ type ExtentStore interface { // Returns: // none Close() + + // CloseExtentDB is closes the handle when opened using OpenExtentDB + // Args: + // none + // + // Returns: + // none + CloseExtentDB() } // -- Misc utility functions -- //