Skip to content

Commit

Permalink
perf: improve batch performance
Browse files Browse the repository at this point in the history
  • Loading branch information
Sora233 committed Mar 22, 2024
1 parent a776163 commit c9de438
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 134 deletions.
243 changes: 110 additions & 133 deletions batch.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rosedb

import (
"bytes"
"fmt"
"sync"
"time"
Expand All @@ -22,14 +21,15 @@ import (
//
// You must call Commit method to commit the batch, otherwise the DB will be locked.
type Batch struct {
db *DB
pendingWrites []*LogRecord // save the data to be written
options BatchOptions
mu sync.RWMutex
committed bool // whether the batch has been committed
rollbacked bool // whether the batch has been rollbacked
batchId *snowflake.Node
buffers []*bytebufferpool.ByteBuffer
db *DB
pendingWrites []*LogRecord // save the data to be written
pendingWritesMap map[string]int // map record key to index
options BatchOptions
mu sync.RWMutex
committed bool // whether the batch has been committed
rollbacked bool // whether the batch has been rollbacked
batchId *snowflake.Node
buffers []*bytebufferpool.ByteBuffer
}

// NewBatch creates a new Batch instance.
Expand Down Expand Up @@ -77,6 +77,9 @@ func (b *Batch) init(rdonly, sync bool, db *DB) *Batch {
func (b *Batch) reset() {
b.db = nil
b.pendingWrites = b.pendingWrites[:0]
for key := range b.pendingWritesMap {
delete(b.pendingWritesMap, key)
}
b.committed = false
b.rollbacked = false
// put all buffers back to the pool
Expand Down Expand Up @@ -116,19 +119,12 @@ func (b *Batch) Put(key []byte, value []byte) error {

b.mu.Lock()
// write to pendingWrites
var record *LogRecord
// if the key exists in pendingWrites, update the value directly
for i := len(b.pendingWrites) - 1; i >= 0; i-- {
if bytes.Equal(key, b.pendingWrites[i].Key) {
record = b.pendingWrites[i]
break
}
}
var record = b.lookupPendingWrites(key)
if record == nil {
// if the key does not exist in pendingWrites, write a new record
// the record will be put back to the pool when the batch is committed or rollbacked
record = b.db.recordPool.Get().(*LogRecord)
b.pendingWrites = append(b.pendingWrites, record)
b.appendPendingWrites(key, record)
}

record.Key, record.Value = key, value
Expand All @@ -152,19 +148,12 @@ func (b *Batch) PutWithTTL(key []byte, value []byte, ttl time.Duration) error {

b.mu.Lock()
// write to pendingWrites
var record *LogRecord
// if the key exists in pendingWrites, update the value directly
for i := len(b.pendingWrites) - 1; i >= 0; i-- {
if bytes.Equal(key, b.pendingWrites[i].Key) {
record = b.pendingWrites[i]
break
}
}
var record = b.lookupPendingWrites(key)
if record == nil {
// if the key does not exist in pendingWrites, write a new record
// the record will be put back to the pool when the batch is committed or rollbacked
record = b.db.recordPool.Get().(*LogRecord)
b.pendingWrites = append(b.pendingWrites, record)
b.appendPendingWrites(key, record)
}

record.Key, record.Value = key, value
Expand All @@ -186,13 +175,7 @@ func (b *Batch) Get(key []byte) ([]byte, error) {
now := time.Now().UnixNano()
// get from pendingWrites
b.mu.RLock()
var record *LogRecord
for i := len(b.pendingWrites) - 1; i >= 0; i-- {
if bytes.Equal(key, b.pendingWrites[i].Key) {
record = b.pendingWrites[i]
break
}
}
var record = b.lookupPendingWrites(key)
b.mu.RUnlock()

// if the record is in pendingWrites, return the value directly
Expand Down Expand Up @@ -240,20 +223,19 @@ func (b *Batch) Delete(key []byte) error {
b.mu.Lock()
// only need key and type when deleting a value.
var exist bool
for i := len(b.pendingWrites) - 1; i >= 0; i-- {
if bytes.Equal(key, b.pendingWrites[i].Key) {
b.pendingWrites[i].Type = LogRecordDeleted
b.pendingWrites[i].Value = nil
b.pendingWrites[i].Expire = 0
exist = true
break
}
var record = b.lookupPendingWrites(key)
if record != nil {
record.Type = LogRecordDeleted
record.Value = nil
record.Expire = 0
exist = true
}
if !exist {
b.pendingWrites = append(b.pendingWrites, &LogRecord{
record = &LogRecord{
Key: key,
Type: LogRecordDeleted,
})
}
b.appendPendingWrites(key, record)
}
b.mu.Unlock()

Expand All @@ -272,13 +254,7 @@ func (b *Batch) Exist(key []byte) (bool, error) {
now := time.Now().UnixNano()
// check if the key exists in pendingWrites
b.mu.RLock()
var record *LogRecord
for i := len(b.pendingWrites) - 1; i >= 0; i-- {
if bytes.Equal(key, b.pendingWrites[i].Key) {
record = b.pendingWrites[i]
break
}
}
var record = b.lookupPendingWrites(key)
b.mu.RUnlock()

if record != nil {
Expand Down Expand Up @@ -320,13 +296,7 @@ func (b *Batch) Expire(key []byte, ttl time.Duration) error {
b.mu.Lock()
defer b.mu.Unlock()

var record *LogRecord
for i := len(b.pendingWrites) - 1; i >= 0; i-- {
if bytes.Equal(key, b.pendingWrites[i].Key) {
record = b.pendingWrites[i]
break
}
}
var record = b.lookupPendingWrites(key)

// if the key exists in pendingWrites, update the expiry time directly
if record != nil {
Expand All @@ -335,30 +305,30 @@ func (b *Batch) Expire(key []byte, ttl time.Duration) error {
return ErrKeyNotFound
}
record.Expire = time.Now().Add(ttl).UnixNano()
} else {
// if the key does not exist in pendingWrites, get the value from wal
position := b.db.index.Get(key)
if position == nil {
return ErrKeyNotFound
}
chunk, err := b.db.dataFiles.Read(position)
if err != nil {
return err
}
return nil
}
// if the key does not exist in pendingWrites, get the value from wal
position := b.db.index.Get(key)
if position == nil {
return ErrKeyNotFound
}
chunk, err := b.db.dataFiles.Read(position)
if err != nil {
return err
}

now := time.Now()
record = decodeLogRecord(chunk)
// if the record is deleted or expired, we can assume that the key does not exist,
// and delete the key from the index
if record.Type == LogRecordDeleted || record.IsExpired(now.UnixNano()) {
b.db.index.Delete(key)
return ErrKeyNotFound
}
// now we get the value from wal, update the expiry time
// and rewrite the record to pendingWrites
record.Expire = now.Add(ttl).UnixNano()
b.pendingWrites = append(b.pendingWrites, record)
now := time.Now()
record = decodeLogRecord(chunk)
// if the record is deleted or expired, we can assume that the key does not exist,
// and delete the key from the index
if record.Type == LogRecordDeleted || record.IsExpired(now.UnixNano()) {
b.db.index.Delete(key)
return ErrKeyNotFound
}
// now we get the value from wal, update the expiry time
// and rewrite the record to pendingWrites
record.Expire = now.Add(ttl).UnixNano()
b.appendPendingWrites(key, record)

return nil
}
Expand All @@ -376,27 +346,17 @@ func (b *Batch) TTL(key []byte) (time.Duration, error) {
b.mu.Lock()
defer b.mu.Unlock()

// check if the key exists in pendingWrites
if len(b.pendingWrites) > 0 {
var record *LogRecord
for i := len(b.pendingWrites) - 1; i >= 0; i-- {
if bytes.Equal(key, b.pendingWrites[i].Key) {
record = b.pendingWrites[i]
break
}
var record = b.lookupPendingWrites(key)
if record != nil {
if record.Expire == 0 {
return -1, nil
}
// if the key exists in pendingWrites, return the ttl directly
if record != nil {
if record.Expire == 0 {
return -1, nil
}
// return key not found if the record is deleted or expired
if record.Type == LogRecordDeleted || record.IsExpired(now.UnixNano()) {
return -1, ErrKeyNotFound
}
// now we get the valid expiry time, we can calculate the ttl
return time.Duration(record.Expire - now.UnixNano()), nil
// return key not found if the record is deleted or expired
if record.Type == LogRecordDeleted || record.IsExpired(now.UnixNano()) {
return -1, ErrKeyNotFound
}
// now we get the valid expiry time, we can calculate the ttl
return time.Duration(record.Expire - now.UnixNano()), nil
}

// if the key does not exist in pendingWrites, get the value from wal
Expand All @@ -410,7 +370,7 @@ func (b *Batch) TTL(key []byte) (time.Duration, error) {
}

// return key not found if the record is deleted or expired
record := decodeLogRecord(chunk)
record = decodeLogRecord(chunk)
if record.Type == LogRecordDeleted {
return -1, ErrKeyNotFound
}
Expand Down Expand Up @@ -443,47 +403,41 @@ func (b *Batch) Persist(key []byte) error {
defer b.mu.Unlock()

// if the key exists in pendingWrites, update the expiry time directly
var record *LogRecord
for i := len(b.pendingWrites) - 1; i >= 0; i-- {
if bytes.Equal(key, b.pendingWrites[i].Key) {
record = b.pendingWrites[i]
break
}
}

var record = b.lookupPendingWrites(key)
if record != nil {
if record.Type == LogRecordDeleted && record.IsExpired(time.Now().UnixNano()) {
return ErrKeyNotFound
}
record.Expire = 0
} else {
// check if the key exists in index
position := b.db.index.Get(key)
if position == nil {
return ErrKeyNotFound
}
chunk, err := b.db.dataFiles.Read(position)
if err != nil {
return err
}
return nil
}

record := decodeLogRecord(chunk)
now := time.Now().UnixNano()
// check if the record is deleted or expired
if record.Type == LogRecordDeleted || record.IsExpired(now) {
b.db.index.Delete(record.Key)
return ErrKeyNotFound
}
// if the expiration time is 0, it means that the key has no expiration time,
// so we can return directly
if record.Expire == 0 {
return nil
}
// check if the key exists in index
position := b.db.index.Get(key)
if position == nil {
return ErrKeyNotFound
}
chunk, err := b.db.dataFiles.Read(position)
if err != nil {
return err
}

// set the expiration time to 0, and rewrite the record to wal
record.Expire = 0
b.pendingWrites = append(b.pendingWrites, record)
record = decodeLogRecord(chunk)
now := time.Now().UnixNano()
// check if the record is deleted or expired
if record.Type == LogRecordDeleted || record.IsExpired(now) {
b.db.index.Delete(record.Key)
return ErrKeyNotFound
}
// if the expiration time is 0, it means that the key has no expiration time,
// so we can return directly
if record.Expire == 0 {
return nil
}

// set the expiration time to 0, and rewrite the record to wal
record.Expire = 0
b.appendPendingWrites(key, record)

return nil
}
Expand Down Expand Up @@ -602,8 +556,31 @@ func (b *Batch) Rollback() error {
b.db.recordPool.Put(record)
}
b.pendingWrites = b.pendingWrites[:0]
for key := range b.pendingWritesMap {
delete(b.pendingWritesMap, key)
}
}

b.rollbacked = true
return nil
}

// lookupPendingWrites if the key exists in pendingWrites, update the value directly
func (b *Batch) lookupPendingWrites(key []byte) (record *LogRecord) {
if len(b.pendingWritesMap) == 0 {
return
}
if idx, found := b.pendingWritesMap[string(key)]; found {
record = b.pendingWrites[idx]
return
}
return
}

func (b *Batch) appendPendingWrites(key []byte, record *LogRecord) {
b.pendingWrites = append(b.pendingWrites, record)
if b.pendingWritesMap == nil {
b.pendingWritesMap = make(map[string]int)
}
b.pendingWritesMap[string(key)] = len(b.pendingWrites) - 1
}
Loading

0 comments on commit c9de438

Please sign in to comment.