Skip to content

Commit

Permalink
Back to BTree with some minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
praserx committed Jul 16, 2020
1 parent 16ec488 commit e25a42e
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/emirpasic/gods/trees/btree"
)

// Internal cache errors
Expand All @@ -31,8 +33,8 @@ type AtomicCache struct {
// Deadlock mutex for debugging purpose.
// deadlock.RWMutex

// Lookup structure used for global index. It is based on hashtable structure.
lookup map[string]LookupRecord
// Lookup structure used for global index. It is based on BTree structure.
lookup *btree.Tree

// Shards lookup tables which contains information about shards sections.
smallShards, mediumShards, largeShards ShardsLookup
Expand Down Expand Up @@ -86,7 +88,7 @@ type LookupRecord struct {
}

// BufferItem is used for buffer, which contains all unattended cache set
// requrest.
// request.
type BufferItem struct {
Key []byte
Data []byte
Expand All @@ -103,7 +105,7 @@ func New(opts ...Option) *AtomicCache {
MaxShardsSmall: 256,
MaxShardsMedium: 128,
MaxShardsLarge: 64,
GcStarter: 5000,
GcStarter: 25000,
}

for _, opt := range opts {
Expand All @@ -114,7 +116,7 @@ func New(opts ...Option) *AtomicCache {
cache := &AtomicCache{}

// Init lookup table
cache.lookup = make(map[string]LookupRecord)
cache.lookup = btree.NewWithStringComparator(3)

// Init small shards section
initShardsSection(&cache.smallShards, options.MaxShardsSmall, options.MaxRecords, options.RecordSizeSmall)
Expand Down Expand Up @@ -163,15 +165,15 @@ func (a *AtomicCache) Set(key []byte, data []byte, expire time.Duration) error {
shardSection, shardSectionID := a.getShardsSectionBySize(len(data))

a.Lock()
if ival, ok := a.lookup[string(key)]; !ok {
if ival, ok := a.lookup.Get(string(key)); !ok {
new = true
} else {
val := ival
val := ival.(LookupRecord)

if val.ShardSection != shardSectionID {
shardSection.shards[val.ShardIndex].Free(val.RecordIndex)
val.RecordIndex = shardSection.shards[val.ShardIndex].Set(data)
a.lookup[string(key)] = LookupRecord{ShardIndex: val.ShardIndex, ShardSection: shardSectionID, RecordIndex: val.RecordIndex, Expiration: a.getExprTime(expire)}
a.lookup.Put(string(key), LookupRecord{ShardIndex: val.ShardIndex, ShardSection: shardSectionID, RecordIndex: val.RecordIndex, Expiration: a.getExprTime(expire)})
} else {
prevShardSection := a.getShardsSectionByID(val.ShardSection)
prevShardSection.shards[val.ShardIndex].Free(val.RecordIndex)
Expand All @@ -182,11 +184,11 @@ func (a *AtomicCache) Set(key []byte, data []byte, expire time.Duration) error {
if new {
if si, ok := a.getShard(shardSectionID); ok {
ri := shardSection.shards[si].Set(data)
a.lookup[string(key)] = LookupRecord{ShardIndex: si, ShardSection: shardSectionID, RecordIndex: ri, Expiration: a.getExprTime(expire)}
a.lookup.Put(string(key), LookupRecord{ShardIndex: si, ShardSection: shardSectionID, RecordIndex: ri, Expiration: a.getExprTime(expire)})
} else if si, ok := a.getEmptyShard(shardSectionID); ok {
shardSection.shards[si] = NewShard(a.MaxRecords, a.getRecordSizeByShardSectionID(shardSectionID))
ri := shardSection.shards[si].Set(data)
a.lookup[string(key)] = LookupRecord{ShardIndex: si, ShardSection: shardSectionID, RecordIndex: ri, Expiration: a.getExprTime(expire)}
a.lookup.Put(string(key), LookupRecord{ShardIndex: si, ShardSection: shardSectionID, RecordIndex: ri, Expiration: a.getExprTime(expire)})
} else {
if len(a.buffer) <= int(a.MaxRecords) {
a.buffer = append(a.buffer, BufferItem{Key: key, Data: data, Expire: expire})
Expand Down Expand Up @@ -215,7 +217,8 @@ func (a *AtomicCache) Get(key []byte) ([]byte, error) {
var hit = false

a.RLock()
if val, ok := a.lookup[string(key)]; ok {
if ival, ok := a.lookup.Get(string(key)); ok {
val := ival.(LookupRecord)
shardSection := a.getShardsSectionByID(val.ShardSection)

if shardSection.shards[val.ShardIndex] != nil && time.Now().Before(val.Expiration) {
Expand Down Expand Up @@ -355,14 +358,16 @@ func (a *AtomicCache) getExprTime(expire time.Duration) time.Time {
// active shard).
func (a *AtomicCache) collectGarbage() {
a.Lock()
for k, v := range a.lookup {
for _, k := range a.lookup.Keys() {
iv, _ := a.lookup.Get(k.(string)) // get record
v := iv.(LookupRecord) // convert record from interface to LookupRecord
shardSection := a.getShardsSectionByID(v.ShardSection) // get shard section
if time.Now().After(v.Expiration) {
shardSection.shards[v.ShardIndex].Free(v.RecordIndex)
if len(shardSection.shardsActive) > 1 {
a.releaseShard(v.ShardSection, v.ShardIndex)
}
delete(a.lookup, k)
a.lookup.Remove(k)
}
}

Expand Down

0 comments on commit e25a42e

Please sign in to comment.