Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
cdb: debug tool to read raw extent data from store (#171)
Browse files Browse the repository at this point in the history
* cdb: read raw data from manyrocks

* open/close manyrocks db

* fix import path

* storage interface

* add missing source file
  • Loading branch information
Kiran RG committed Apr 26, 2017
1 parent 6442730 commit d81c1fa
Show file tree
Hide file tree
Showing 5 changed files with 339 additions and 1 deletion.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ cherami-cassandra-tool: $(DEPS)
cherami-store-tool: $(DEPS)
go build -i $(EMBED) -o cherami-store-tool cmd/tools/store/main.go

bins: cherami-server cherami-replicator-server cherami-cli cherami-admin cherami-replicator-tool cherami-cassandra-tool cherami-store-tool
cdb: $(DEPS)
go build -i $(EMBED) -o cdb cmd/tools/cdb/*.go

bins: cherami-server cherami-replicator-server cherami-cli cherami-admin cherami-replicator-tool cherami-cassandra-tool cherami-store-tool cdb

cover_profile: lint bins
@mkdir -p $(BUILD)
Expand Down
225 changes: 225 additions & 0 deletions cmd/tools/cdb/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package main

import (
"flag"
"fmt"
"math"
"os"
"path/filepath"
"strconv"
"time"

"github.com/uber/cherami-server/storage"
"github.com/uber/cherami-server/storage/manyrocks"
)

type arguments struct {
store string
extent string
baseDir string
start storage.Key
end storage.Key
num int
printVal bool
formatTime bool
help bool
}

func main() {

args := parseArgs()

if args == nil {
return
}

var db storage.ExtentStore
var err error

switch args.store {
case "manyrocks":
db, err = manyrocks.OpenExtentDB(storage.ExtentUUID(args.extent), fmt.Sprintf("%s/%s", args.baseDir, args.extent))
if err != nil {
fmt.Printf("error opening db (%s): %v\n", args.baseDir, err)
return
}

default:
fmt.Printf("unsupported store: %s\n", args.store)
return
}

defer db.CloseExtentDB()

dumpExtentDB(db, args)

return
}

func dumpExtentDB(db storage.ExtentStore, args *arguments) {

addr, key, err := db.SeekCeiling(args.start)

if err != nil {
fmt.Printf("db.SeekCeiling(%x) error: %v\n", args.start, err)
return
}

var val storage.Value

if args.printVal {

_, val, _, _, err = db.Get(addr)

if err != nil {
fmt.Printf("db.Get(%x) errored: %v\n", addr, err)
return
}
}

var num int
for (addr != storage.EOX) && (key < args.end) && (num < args.num) {

// fmt.Printf("key = %d %x %v %p\n", key, key, key, key)

if isSealExtentKey(key) {

sealSeqNum := deconstructSealExtentKey(key)

if sealSeqNum == seqNumUnspecifiedSeal {
fmt.Printf("0x%016v => SEALED (seqnum: unspecified)\n", key)
} else {
fmt.Printf("0x%016v => SEALED (seqnum: %d)\n", key, sealSeqNum)
}

} else {

ts, seqnum := deconstructKey(key)

if args.printVal {

var enqTime, payload, vTime string

msg, errd := deserializeMessage(val)
enq := msg.GetEnqueueTimeUtc()

if errd != nil { // corrupt message?

payload = fmt.Sprintf("ERROR deserializing data: %v (val=%v)", errd, val)

} else {

if args.formatTime {
enqTime = time.Unix(0, enq).Format(time.RFC3339Nano)
} else {
enqTime = strconv.FormatInt(enq, 16)
}

payload = fmt.Sprintf("seq=%d enq=%v data=%d bytes",
msg.GetSequenceNumber(), enqTime, len(msg.GetPayload().GetData()))
// payload = msg.String()
}

if args.formatTime {
vTime = time.Unix(0, ts).Format(time.RFC3339Nano)
} else {
vTime = strconv.FormatInt(ts, 16)
}

fmt.Printf("0x%016v => #%d ts=%v payload:[%v]\n", key, seqnum, vTime, payload)

} else {

fmt.Printf("0x%016v => #%d ts=%v\n", key, seqnum, ts)
}
}

num++

if args.printVal {

if key, val, addr, _, err = db.Get(addr); err != nil {
fmt.Printf("db.Get(%x) errored: %v\n", addr, err)
break
}

} else {

if addr, key, err = db.Next(addr); err != nil {
fmt.Printf("db.Next(%x) errored: %v\n", addr, err)
break
}
}
}

fmt.Printf("summary: dumped %d keys in range [%v, %v)\n", num, args.start, args.end)
return
}

func parseArgs() (args *arguments) {

args = &arguments{}

flag.StringVar(&args.store, "store", "manyrocks", "store")
flag.StringVar(&args.extent, "x", "", "extent")
flag.StringVar(&args.baseDir, "base", ".", "base dir")

var start, end, num string

flag.StringVar(&start, "s", "-1", "start range")
flag.StringVar(&end, "e", "-1", "end range")
flag.StringVar(&num, "n", "-1", "number of values")

flag.BoolVar(&args.printVal, "v", false, "deserialize payload")
flag.BoolVar(&args.formatTime, "t", false, "format time")

flag.BoolVar(&args.help, "?", false, "help")
flag.BoolVar(&args.help, "help", false, "help")

flag.Parse()

switch {
case args.extent == "":
cwd, _ := os.Getwd()

args.extent = filepath.Base(cwd)
args.baseDir = filepath.Dir(cwd)

case args.baseDir == "":
args.baseDir, _ = os.Getwd()
}

switch i, err := strconv.ParseInt(start, 0, 64); {
case err != nil:
fmt.Printf("error parsing start arg (%s): %v\n", start, err)
return nil
case i < 0:
args.start = 0
default:
args.start = storage.Key(i)
}

switch i, err := strconv.ParseInt(end, 0, 64); {
case err != nil:
fmt.Printf("error parsing end arg (%s): %v\n", end, err)
return nil
case i < 0:
args.end = storage.Key(math.MaxInt64)
default:
args.end = storage.Key(i)
}

switch i, err := strconv.ParseInt(num, 0, 64); {
case err != nil:
fmt.Printf("error parsing num arg (%s): %v\n", num, err)
return nil
case i < 0:
args.num = math.MaxInt64
default:
args.num = int(i)
}

// fmt.Printf("args=%v\n", args)

return args
}
54 changes: 54 additions & 0 deletions cmd/tools/cdb/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package main

import (
"math"

"github.com/uber/cherami-server/storage"
"github.com/uber/cherami-thrift/.generated/go/store"

"github.com/apache/thrift/lib/go/thrift"
)

// -- decode message/address -- //
const (
seqNumBits = 26

invalidKey = math.MaxInt64

seqNumBitmask = (int64(1) << seqNumBits) - 1
timestampBitmask = math.MaxInt64 &^ seqNumBitmask
seqNumMax = int64(math.MaxInt64-2) & seqNumBitmask

seqNumUnspecifiedSeal = int64(math.MaxInt64 - 1)
)

func deconstructKey(key storage.Key) (visibilityTime int64, seqNum int64) {
return int64(int64(key) & timestampBitmask), int64(int64(key) & seqNumBitmask)
}

func deconstructSealExtentKey(key storage.Key) (seqNum int64) {

seqNum = int64(key) & seqNumBitmask

// we use the special seqnum ('MaxInt64 - 1') when the extent has been sealed
// at an "unspecified" seqnum; check for this case, and return appropriate value
if seqNum == (seqNumBitmask - 1) {
seqNum = seqNumUnspecifiedSeal
}

return
}

func isSealExtentKey(key storage.Key) bool {
return key != storage.InvalidKey && (int64(key)&timestampBitmask) == timestampBitmask
}

func deserializeMessage(data []byte) (*store.AppendMessage, error) {
msg := &store.AppendMessage{}
deserializer := thrift.NewTDeserializer()
if err := deserializer.Read(msg, data); err != nil {
return nil, err
}

return msg, nil
}
45 changes: 45 additions & 0 deletions storage/manyrocks/manyrocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/uber-common/bark"
"github.com/uber/cherami-server/common"
s "github.com/uber/cherami-server/storage"

"github.com/Sirupsen/logrus"
)

const rocksdbNotExistError = "does not exist (create_if_missing is false)"
Expand Down Expand Up @@ -115,6 +117,49 @@ func New(opts *Opts, log bark.Logger) (*ManyRocks, error) {
}, nil
}

// OpenExtentDB gets a handle to the raw extent DB
func OpenExtentDB(id s.ExtentUUID, path string) (*Rock, error) {

// setup RocksDB options
opts := gorocksdb.NewDefaultOptions()

opts.SetCreateIfMissing(false)

db, err := gorocksdb.OpenDb(opts, path)
if err != nil {
return nil, err
}

// setup read/write options used with IO
readOpts := gorocksdb.NewDefaultReadOptions()

writeOpts := gorocksdb.NewDefaultWriteOptions()

return &Rock{
id: id,
path: path,
keyPattern: s.IncreasingKeys,
notify: func(key s.Key, addr s.Address) {},
db: db,
opts: opts,
readOpts: readOpts,
writeOpts: writeOpts,
store: &ManyRocks{
logger: bark.NewLoggerFromLogrus(logrus.StandardLogger()),
},
}, nil
}

// CloseExtentDB closes the handle to the raw extent DB
func (t *Rock) CloseExtentDB() {

t.writeOpts.Destroy()
t.readOpts.Destroy()
t.db.Close()
t.opts.Destroy()
t.notify = nil
}

// getDBPath returns the base-dir to use for the DB
func (t *ManyRocks) getDBPath(id s.ExtentUUID) string {
return fmt.Sprintf("%s/%v", t.opts.BaseDir, id) // NB: 'baseDir' should already created
Expand Down
11 changes: 11 additions & 0 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ type StoreManager interface {
GetExtentInfo(id ExtentUUID) (*ExtentInfo, error)
}

// OpenExtentDB gets a handle to the raw extent DB
// func OpenExtentDB(id ExtentUUID, path string) (ExtentStore, error)

// -- EXTENT STORE -- //
// The extent-store is designed and expected to be used concurrently by
// at most one-writer and any number of readers.
Expand Down Expand Up @@ -302,6 +305,14 @@ type ExtentStore interface {
// Returns:
// none
Close()

// CloseExtentDB is closes the handle when opened using OpenExtentDB
// Args:
// none
//
// Returns:
// none
CloseExtentDB()
}

// -- Misc utility functions -- //
Expand Down

0 comments on commit d81c1fa

Please sign in to comment.