Permalink
Browse files

Testing a sharded cache. Could be useful for massively parallel appli…

…cations
  • Loading branch information...
1 parent 0f0584a commit 52c269d8ae70f3a3de9a0279850ea58425c0a76c @patrickmn committed Jun 22, 2012
Showing with 216 additions and 18 deletions.
  1. +143 −18 cache.go
  2. +73 −0 cache_test.go
View
161 cache.go
@@ -1,8 +1,10 @@
package cache
import (
+ "encoding/binary"
"encoding/gob"
"fmt"
+ "hash/fnv"
"io"
"os"
"reflect"
@@ -277,20 +279,19 @@ func (j *janitor) Run(c *cache) {
}
}
-func (j *janitor) Stop() {
- j.stop <- true
+func stopJanitor(c *Cache) {
+ c.janitor.stop <- true
}
-func stopJanitor(c *Cache) {
- c.janitor.Stop()
+func runJanitor(c *cache, ci time.Duration) {
+ j := &janitor{
+ Interval: ci,
+ }
+ c.janitor = j
+ go j.Run(c)
}
-// Return a new cache with a given default expiration duration and cleanup
-// interval. If the expiration duration is less than 1, the items in the cache
-// never expire (by default), and must be deleted manually. If the cleanup
-// interval is less than one, expired items are not deleted from the cache
-// before their next lookup or before calling DeleteExpired.
-func New(de, ci time.Duration) *Cache {
+func newCache(de time.Duration) *cache {
if de == 0 {
de = -1
}
@@ -299,21 +300,145 @@ func New(de, ci time.Duration) *Cache {
Items: map[string]*Item{},
mu: sync.Mutex{},
}
- if ci > 0 {
- j := &janitor{
- Interval: ci,
- }
- c.janitor = j
- go j.Run(c)
- }
+ return c
+}
+
+// Return a new cache with a given default expiration duration and cleanup
+// interval. If the expiration duration is less than 1, the items in the cache
+// never expire (by default), and must be deleted manually. If the cleanup
+// interval is less than one, expired items are not deleted from the cache
+// before their next lookup or before calling DeleteExpired.
+func New(defaultExpiration, cleanupInterval time.Duration) *Cache {
+ c := newCache(defaultExpiration)
// This trick ensures that the janitor goroutine (which--granted it
// was enabled--is running DeleteExpired on c forever) does not keep
// the returned C object from being garbage collected. When it is
// garbage collected, the finalizer stops the janitor goroutine, after
// which c can be collected.
C := &Cache{c}
- if ci > 0 {
+ if cleanupInterval > 0 {
+ runJanitor(c, cleanupInterval)
runtime.SetFinalizer(C, stopJanitor)
}
return C
}
+
+type ShardedCache struct {
+ *shardedCache
+}
+
+type shardedCache struct {
+ m uint32
+ cs []*cache
+ janitor *shardedJanitor
+}
+
+func (sc *shardedCache) index(k string) uint32 {
+ h := fnv.New32()
+ h.Write([]byte(k))
+ n := binary.BigEndian.Uint32(h.Sum(nil))
+ return n % sc.m
+}
+
+func (sc *shardedCache) Set(k string, x interface{}, d time.Duration) {
+ sc.cs[sc.index(k)].Set(k, x, d)
+}
+
+func (sc *shardedCache) Add(k string, x interface{}, d time.Duration) error {
+ return sc.cs[sc.index(k)].Add(k, x, d)
+}
+
+func (sc *shardedCache) Replace(k string, x interface{}, d time.Duration) error {
+ return sc.cs[sc.index(k)].Replace(k, x, d)
+}
+
+func (sc *shardedCache) Get(k string) (interface{}, bool) {
+ return sc.cs[sc.index(k)].Get(k)
+}
+
+func (sc *shardedCache) Increment(k string, n int64) error {
+ return sc.cs[sc.index(k)].Increment(k, n)
+}
+
+func (sc *shardedCache) IncrementFloat(k string, n float64) error {
+ return sc.cs[sc.index(k)].IncrementFloat(k, n)
+}
+
+func (sc *shardedCache) Decrement(k string, n int64) error {
+ return sc.cs[sc.index(k)].Decrement(k, n)
+}
+
+func (sc *shardedCache) Delete(k string) {
+ sc.cs[sc.index(k)].Delete(k)
+}
+
+func (sc *shardedCache) DeleteExpired() {
+ for _, v := range sc.cs {
+ v.DeleteExpired()
+ }
+}
+
+func (sc *shardedCache) Flush() {
+ for _, v := range sc.cs {
+ v.Flush()
+ }
+}
+
+type shardedJanitor struct {
+ Interval time.Duration
+ stop chan bool
+}
+
+func (j *shardedJanitor) Run(sc *shardedCache) {
+ j.stop = make(chan bool)
+ tick := time.Tick(j.Interval)
+ for {
+ select {
+ case <-tick:
+ sc.DeleteExpired()
+ case <-j.stop:
+ return
+ }
+ }
+}
+
+func stopShardedJanitor(sc *ShardedCache) {
+ sc.janitor.stop <- true
+}
+
+func runShardedJanitor(sc *shardedCache, ci time.Duration) {
+ j := &shardedJanitor{
+ Interval: ci,
+ }
+ sc.janitor = j
+ go j.Run(sc)
+}
+
+func newShardedCache(n int, de time.Duration) *shardedCache {
+ sc := &shardedCache{
+ m: uint32(n - 1),
+ cs: make([]*cache, n),
+ }
+ for i := 0; i < n; i++ {
+ c := &cache{
+ DefaultExpiration: de,
+ Items: map[string]*Item{},
+ mu: sync.Mutex{},
+ }
+ sc.cs[i] = c
+ }
+ return sc
+}
+
+func NewSharded(shards int, defaultExpiration, cleanupInterval time.Duration) *ShardedCache {
+ if defaultExpiration == 0 {
+ defaultExpiration = -1
+ }
+ sc := newShardedCache(shards, defaultExpiration)
+ SC := &ShardedCache{sc}
+ if cleanupInterval > 0 {
+ runShardedJanitor(sc, cleanupInterval)
+ runtime.SetFinalizer(SC, stopShardedJanitor)
+ }
+ return SC
+}
View
@@ -4,6 +4,7 @@ import (
"bytes"
"io/ioutil"
"runtime"
+ "strconv"
"sync"
"testing"
"time"
@@ -636,18 +637,22 @@ func TestSerializeUnserializable(t *testing.T) {
}
func BenchmarkCacheGet(b *testing.B) {
+ b.StopTimer()
tc := New(0, 0)
tc.Set("foo", "bar", 0)
+ b.StartTimer()
for i := 0; i < b.N; i++ {
tc.Get("foo")
}
}
func BenchmarkMutexMapGet(b *testing.B) {
+ b.StopTimer()
m := map[string]string{
"foo": "bar",
}
mu := sync.Mutex{}
+ b.StartTimer()
for i := 0; i < b.N; i++ {
mu.Lock()
_, _ = m["foo"]
@@ -656,12 +661,14 @@ func BenchmarkMutexMapGet(b *testing.B) {
}
func BenchmarkCacheGetConcurrent(b *testing.B) {
+ b.StopTimer()
tc := New(0, 0)
tc.Set("foo", "bar", 0)
wg := new(sync.WaitGroup)
workers := runtime.NumCPU()
each := b.N / workers
wg.Add(workers)
+ b.StartTimer()
for i := 0; i < workers; i++ {
go func() {
for j := 0; j < each; j++ {
@@ -674,6 +681,7 @@ func BenchmarkCacheGetConcurrent(b *testing.B) {
}
func BenchmarkMutexMapGetConcurrent(b *testing.B) {
+ b.StopTimer()
m := map[string]string{
"foo": "bar",
}
@@ -682,6 +690,7 @@ func BenchmarkMutexMapGetConcurrent(b *testing.B) {
workers := runtime.NumCPU()
each := b.N / workers
wg.Add(workers)
+ b.StartTimer()
for i := 0; i < workers; i++ {
go func() {
for j := 0; j < each; j++ {
@@ -695,16 +704,72 @@ func BenchmarkMutexMapGetConcurrent(b *testing.B) {
wg.Wait()
}
+func BenchmarkCacheGetManyConcurrent(b *testing.B) {
+ // This is the same as BenchmarkCacheGetConcurrent, but its result
+ // can be compared against BenchmarkShardedCacheGetManyConcurrent.
+ b.StopTimer()
+ n := 10000
+ tc := New(0, 0)
+ keys := make([]string, n)
+ for i := 0; i < n; i++ {
+ k := "foo" + strconv.Itoa(n)
+ keys[i] = k
+ tc.Set(k, "bar", 0)
+ }
+ each := b.N / n
+ wg := new(sync.WaitGroup)
+ wg.Add(n)
+ for _, v := range keys {
+ go func() {
+ for j := 0; j < each; j++ {
+ tc.Get(v)
+ }
+ wg.Done()
+ }()
+ }
+ b.StartTimer()
+ wg.Wait()
+}
+
+func BenchmarkShardedCacheGetManyConcurrent(b *testing.B) {
+ b.StopTimer()
+ n := 10000
+ tsc := NewSharded(20, 0, 0)
+ keys := make([]string, n)
+ for i := 0; i < n; i++ {
+ k := "foo" + strconv.Itoa(n)
+ keys[i] = k
+ tsc.Set(k, "bar", 0)
+ }
+ each := b.N / n
+ wg := new(sync.WaitGroup)
+ wg.Add(n)
+ for _, v := range keys {
+ go func() {
+ for j := 0; j < each; j++ {
+ tsc.Get(v)
+ }
+ wg.Done()
+ }()
+ }
+ b.StartTimer()
+ wg.Wait()
+}
+
func BenchmarkCacheSet(b *testing.B) {
+ b.StopTimer()
tc := New(0, 0)
+ b.StartTimer()
for i := 0; i < b.N; i++ {
tc.Set("foo", "bar", 0)
}
}
func BenchmarkMutexMapSet(b *testing.B) {
+ b.StopTimer()
m := map[string]string{}
mu := sync.Mutex{}
+ b.StartTimer()
for i := 0; i < b.N; i++ {
mu.Lock()
m["foo"] = "bar"
@@ -713,16 +778,20 @@ func BenchmarkMutexMapSet(b *testing.B) {
}
func BenchmarkCacheSetDelete(b *testing.B) {
+ b.StopTimer()
tc := New(0, 0)
+ b.StartTimer()
for i := 0; i < b.N; i++ {
tc.Set("foo", "bar", 0)
tc.Delete("foo")
}
}
func BenchmarkMutexMapSetDelete(b *testing.B) {
+ b.StopTimer()
m := map[string]string{}
mu := sync.Mutex{}
+ b.StartTimer()
for i := 0; i < b.N; i++ {
mu.Lock()
m["foo"] = "bar"
@@ -734,7 +803,9 @@ func BenchmarkMutexMapSetDelete(b *testing.B) {
}
func BenchmarkCacheSetDeleteSingleLock(b *testing.B) {
+ b.StopTimer()
tc := New(0, 0)
+ b.StartTimer()
for i := 0; i < b.N; i++ {
tc.mu.Lock()
tc.set("foo", "bar", 0)
@@ -744,8 +815,10 @@ func BenchmarkCacheSetDeleteSingleLock(b *testing.B) {
}
func BenchmarkMutexMapSetDeleteSingleLock(b *testing.B) {
+ b.StopTimer()
m := map[string]string{}
mu := sync.Mutex{}
+ b.StartTimer()
for i := 0; i < b.N; i++ {
mu.Lock()
m["foo"] = "bar"

0 comments on commit 52c269d

Please sign in to comment.