diff --git a/test/drv/junoload/junoload.go b/test/drv/junoload/junoload.go index 9297f65..c4e7aaf 100644 --- a/test/drv/junoload/junoload.go +++ b/test/drv/junoload/junoload.go @@ -24,7 +24,9 @@ import ( "fmt" "math/rand" "net/http" + _ "net/http/pprof" "os" + "strconv" "strings" "sync" "time" @@ -37,6 +39,7 @@ import ( "juno/pkg/cmd" "juno/pkg/logging/cal" "juno/pkg/sec" + "juno/pkg/util" "juno/pkg/version" ) @@ -75,6 +78,8 @@ type ( logLevel string isVariable bool disableGetTTL bool + keys string + randomize bool } ) @@ -96,6 +101,7 @@ const ( func (d *SyncTestDriver) setDefaultConfig() { d.config.SetDefault() + d.config.Sec = sec.DefaultConfig d.config.Cal.Default() d.config.Cal.Poolname = "junoload" @@ -112,6 +118,9 @@ func (d *SyncTestDriver) setDefaultConfig() { d.config.StatOutputRate = kDefaultStatOutputRate d.config.isVariable = false d.config.disableGetTTL = false + d.config.sKey = -1 + d.config.eKey = -1 + d.config.randomize = false } func (d *SyncTestDriver) Init(name string, desc string) { @@ -141,6 +150,8 @@ func (d *SyncTestDriver) Init(name string, desc string) { d.StringOption(&d.cmdOpts.dbpath, "dbpath", "", "to display rocksdb stats") d.StringOption(&d.cmdOpts.logLevel, "log-level", "info", "specify log level") d.BoolOption(&d.cmdOpts.disableGetTTL, "disableGetTTL", false, "not use random ttl for get operation") + d.StringOption(&d.cmdOpts.keys, "keys", "", "key strange, separated with ,") + d.BoolOption(&d.cmdOpts.randomize, "r|randomize", false, "randomize, get/update/delete") t := &SyncTestDriver{} t.setDefaultConfig() @@ -160,7 +171,28 @@ func (d *SyncTestDriver) Init(name string, desc string) { d.AddExample(name+" -s 127.0.0.1:8080 -ssl", "\trun the driver with SSL") d.AddExample(name+" -c config.toml", "\trun the driver with options specified in config.toml") +} + +func parseKeys(key string) (start int, last int) { + var err error + list := strings.Split(key, ",") + start, err = strconv.Atoi(list[0]) + if err != nil { + glog.Exitf("%s", err) + } + if len(list) < 2 { + last = start + 1 + } else { + last, err = strconv.Atoi(list[1]) + if err != nil { + glog.Exitf("%s", err) + } + } + if start < 0 || last < 0 { + glog.Exitf("Negative range params are not allowed.") + } + return } func (d *SyncTestDriver) Parse(args []string) (err error) { @@ -235,6 +267,14 @@ func (d *SyncTestDriver) Parse(args []string) (err error) { d.config.HttpMonAddr = ":" + d.config.HttpMonAddr } + if d.cmdOpts.keys != "" { + start, end := parseKeys(d.cmdOpts.keys) + d.config.sKey = int64(start) + d.config.eKey = int64(end) + } + + d.config.randomize = d.cmdOpts.randomize + d.config.Cal.Default() if d.config.Cal.Enabled { @@ -303,6 +343,10 @@ func (d *SyncTestDriver) Exec() { var wg sync.WaitGroup chDone := make(chan bool) + var numRunningExecutors util.AtomicCounter + numRunningExecutors.Reset() + numRunningExecutors.Add(int32(d.config.NumExecutor)) + if d.config.NumExecutor > 0 { wg.Add(1) go func() { @@ -322,9 +366,14 @@ func (d *SyncTestDriver) Exec() { case <-ticker.C: d.movingStats.PrettyPrint(os.Stdout) d.movingStats.Reset() + if numRunningExecutors.Get() == 0 { + timer.Stop() + ticker.Stop() + close(chDone) + break loop + } } } - }() } else { glog.Errorf("number of executor specified is zero") @@ -333,15 +382,17 @@ func (d *SyncTestDriver) Exec() { d.tmStart = time.Now() d.stats.Init() d.movingStats.Init() + var start int = -1 + var end int = -1 + for i := 0; i < d.config.NumExecutor; i++ { - size := d.cmdOpts.numKeys / 2 - num := size / d.config.NumExecutor - offGet := i*num + size + if d.config.sKey >= 0 && d.config.eKey > d.config.sKey { + range_size := (d.config.eKey - d.config.sKey + int64(d.config.NumExecutor-1)) / int64(d.config.NumExecutor) - if size > MaxDeletes { - size = MaxDeletes + start = i*int(range_size) + int(d.config.sKey) + end = start + int(range_size) - 1 //pad } - offDel := i * (size / d.config.NumExecutor) + //fmt.Printf("s=%d, e=%d\n", start, end) cli, err := client.New(d.config.Config) if err != nil { glog.Error(err) @@ -350,15 +401,18 @@ func (d *SyncTestDriver) Exec() { eng := &TestEngine{ rdgen: d.randgen, recStore: RecordStore{ - numKeys: num, - offsetDel: offDel, - offsetGet: offGet}, + nextKey: int(start), + sKey: int(start), + eKey: int(end), + randomize: d.config.randomize, + }, reqSequence: d.reqSequence, // chDone: chDone, client: cli, stats: &d.stats, movingStats: &d.movingStats, numReqPerSecond: d.config.NumReqPerSecond, + numRunningExec: &numRunningExecutors, } eng.Init() wg.Add(1) diff --git a/test/drv/junoload/tdscfg.go b/test/drv/junoload/tdscfg.go index 46fb27e..8e57785 100644 --- a/test/drv/junoload/tdscfg.go +++ b/test/drv/junoload/tdscfg.go @@ -43,5 +43,8 @@ type ( StatOutputRate int isVariable bool disableGetTTL bool + sKey int64 + eKey int64 + randomize bool } ) diff --git a/test/drv/junoload/tsteng.go b/test/drv/junoload/tsteng.go index a9970bf..196dcaf 100644 --- a/test/drv/junoload/tsteng.go +++ b/test/drv/junoload/tsteng.go @@ -16,11 +16,11 @@ // See the License for the specific language governing permissions and // limitations under the License. // - package main import ( "encoding/binary" + "errors" "fmt" "math" "math/rand" @@ -31,6 +31,8 @@ import ( "juno/third_party/forked/golang/glog" + "juno/pkg/util" + uuid "github.com/satori/go.uuid" ) @@ -43,7 +45,7 @@ const ( kNumRequestTypes ) -const MaxDeletes = 10000 +var ErrNoMoreKeys = errors.New("no more keys") type ( RequestType uint8 @@ -53,14 +55,11 @@ type ( } RecordStore struct { - records []Record - // Used for preloaded keys - numKeys int - currGet int - nextDelete int - offsetDel int - offsetGet int - LastDelete bool + records []Record + sKey int + eKey int + nextKey int + randomize bool } TestEngine struct { @@ -73,6 +72,7 @@ type ( stats *Statistics movingStats *Statistics numReqPerSecond int + numRunningExec *util.AtomicCounter } InvokeFunc func() error ) @@ -128,27 +128,35 @@ func (r *Record) isExpired() bool { } func (s *RecordStore) Add(rec Record) { - if s.numKeys > 0 { + if s.isKeyRange() { return } + s.records = append(s.records, rec) } func (s *RecordStore) display() { - glog.Infof("numKeys=%d currGet=%d nextDelete=%d offsetDel=%d offsetGet=%d", - s.numKeys, s.currGet, s.nextDelete, s.offsetDel, s.offsetGet) + /* + glog.Infof("numKeys=%d currGet=%d nextDelete=%d offsetDel=%d offsetGet=%d", + s.numKeys, s.currGet, s.nextDelete, s.offsetDel, s.offsetGet)*/ } func (s *RecordStore) takeRecord() (rec Record, err error) { - if s.numKeys > 0 { // preloaded keys - rec = Record{ - key: NewRandomKey(s.offsetDel + s.nextDelete), + if s.isKeyRange() { + var key_id int + if s.randomize { + key_id = s.sKey + rand.Intn(s.eKey-s.sKey+1) + } else { + if s.nextKey > s.eKey { + err = ErrNoMoreKeys + return + } + key_id = s.nextKey + s.nextKey++ } - if s.endOfDelete() { - err = fmt.Errorf("no more record for destroy") - return + rec = Record{ + key: NewRandomKey(key_id), } - s.nextDelete++ return } @@ -164,17 +172,23 @@ func (s *RecordStore) takeRecord() (rec Record, err error) { } func (s *RecordStore) getRecord() (rec Record, err error) { - if s.numKeys > 0 { // preloaded keys - count := s.numKeys - if s.numKeys >= MaxDeletes { - count = s.numKeys >> 2 + if s.isKeyRange() { + var key_id int + if s.randomize { + key_id = s.sKey + rand.Intn(s.eKey-s.sKey+1) + } else { + if s.nextKey > s.eKey { + err = ErrNoMoreKeys + return + } + key_id = s.nextKey + s.nextKey++ } - k := expRand(count) - s.currGet = k rec = Record{ - key: NewRandomKey(s.offsetGet + k), + key: NewRandomKey(key_id), } + return } @@ -193,32 +207,26 @@ func (s *RecordStore) getRecord() (rec Record, err error) { } func (s *RecordStore) empty() bool { - return len(s.records) == 0 && s.numKeys == 0 -} - -func (s *RecordStore) endOfDelete() bool { - return s.numKeys > 0 && - (s.nextDelete >= s.numKeys || s.nextDelete >= MaxDeletes) + return len(s.records) == 0 && s.isKeyRange() == false } func (s *RecordStore) Get() (rec Record, err error) { for !s.empty() { rec, err = s.getRecord() - if err == nil { - return - } + return } err = fmt.Errorf("no record") return } func (s *RecordStore) Take() (rec Record, err error) { - for !s.empty() && !s.endOfDelete() { + if s.isKeyRange() { + return s.takeRecord() + } + + if !s.empty() { rec, err = s.takeRecord() - if err == nil && !rec.isExpired() { - if s.endOfDelete() { - s.LastDelete = true - } + if err == nil { return } } @@ -226,6 +234,23 @@ func (s *RecordStore) Take() (rec Record, err error) { return } +func (s *RecordStore) getNextKey() (key []byte) { + if s.sKey == -1 { + key = newTestKey() + } else { + if s.nextKey > s.eKey { + return nil + } + key = NewRandomKey(s.nextKey) + s.nextKey++ + } + return +} + +func (s *RecordStore) isKeyRange() bool { + return s.sKey > -1 +} + func (e *TestEngine) Init() { e.invokeFuncs = make([]InvokeFunc, kNumRequestTypes) e.invokeFuncs[kRequestTypeCreate] = e.invokeCreate @@ -235,70 +260,35 @@ func (e *TestEngine) Init() { e.invokeFuncs[kRequestTypeDestroy] = e.invokeDestroy } -func (e *TestEngine) restoreData() { - if e.recStore.numKeys <= 0 || e.recStore.nextDelete <= 0 { - return - } - - count := e.recStore.nextDelete - - // Add back deleted keys - glog.Infof("Add back deleted keys: count=%d", count) - for i := 0; i < count; i++ { - - now := time.Now() - - key := NewRandomKey(e.recStore.offsetDel + i) - _, err := e.client.Create(key, e.rdgen.createPayload()) - tm := time.Since(now) - - e.stats.Put(kRequestTypeCreate, tm, err) - e.movingStats.Put(kRequestTypeCreate, tm, err) - if err != nil { - glog.Errorf("%s error: %s", kRequestTypeCreate.String(), err) - e.recStore.display() - } - } -} - func (e *TestEngine) Run(wg *sync.WaitGroup, chDone <-chan bool) { defer wg.Done() + defer e.numRunningExec.Add(-1) startTime := time.Now() var numreq int = 0 errCount := 0 + for { for _, item := range e.reqSequence.items { - if e.recStore.numKeys > 0 && - item.reqType == kRequestTypeCreate { - continue - } for i := 0; i < item.numRequests; i++ { select { case <-chDone: - e.restoreData() return default: now := time.Now() err := e.invoke(item.reqType) tm := time.Since(now) - if item.reqType == kRequestTypeDestroy && - e.recStore.endOfDelete() { - if e.recStore.LastDelete { - e.recStore.LastDelete = false - } else { - continue - } + if errors.Is(err, ErrNoMoreKeys) { + return } + e.stats.Put(item.reqType, tm, err) e.movingStats.Put(item.reqType, tm, err) if err != nil { glog.Errorf("%s error: %s", item.reqType.String(), err) - if e.recStore.numKeys > 0 { - e.recStore.display() - errCount++ - if errCount > 100 { - return - } + e.recStore.display() + errCount++ + if errCount > 100 { + //return } } diff := now.Sub(startTime) @@ -355,10 +345,13 @@ func (e *TestEngine) checkSpeedForVariableTp(now time.Time, numReq int, startTim } func (e *TestEngine) invokeCreate() (err error) { - if e.recStore.numKeys > 0 { + + key := e.recStore.getNextKey() + if key == nil { + err = ErrNoMoreKeys return } - key := newTestKey() + var ctx client.IContext if ctx, err = e.client.Create(key, e.rdgen.createPayload(), client.WithTTL(e.rdgen.getTTL())); err == nil { @@ -405,9 +398,6 @@ func (e *TestEngine) invokeSet() (err error) { func (e *TestEngine) invokeDestroy() (err error) { var rec Record - if e.recStore.endOfDelete() { - return nil - } if rec, err = e.recStore.Take(); err == nil { err = e.client.Destroy(rec.key) }