Permalink
Browse files

improve redis locking and putbacks

  • Loading branch information...
1 parent cbba7e4 commit 66f1237d67ef64bafcab86bb26d227ee05ef1f23 ♠ ace hacker committed May 8, 2013
Showing with 34 additions and 16 deletions.
  1. +34 −16 store/redis_store.go
View
@@ -8,9 +8,16 @@ import (
"l2met/bucket"
"l2met/encoding"
"l2met/utils"
+ "strconv"
"time"
)
+var lockPrefix, partitionPrefix string
+func init() {
+ lockPrefix = "lock"
+ partitionPrefix = "partition.outlet"
+}
+
var PartitionTable = crc64.MakeTable(crc64.ISO)
type RedisStore struct {
@@ -60,15 +67,16 @@ func (s *RedisStore) Health() bool {
func (s *RedisStore) Scan(schedule time.Time) (<-chan *bucket.Bucket, error) {
retBuckets := make(chan *bucket.Bucket)
- partition, err := s.lockPartition()
+ p, err := s.lockPartition()
if err != nil {
return retBuckets, err
}
+ partition := partitionPrefix + "." + strconv.Itoa(int(p))
go func(out chan *bucket.Bucket) {
rc := s.redisPool.Get()
defer rc.Close()
defer close(out)
- defer s.unlockPartition(partition)
+ defer s.unlockPartition(p)
rc.Send("MULTI")
rc.Send("SMEMBERS", partition)
rc.Send("DEL", partition)
@@ -90,19 +98,29 @@ func (s *RedisStore) Scan(schedule time.Time) (<-chan *bucket.Bucket, error) {
if !bucketReady.After(schedule) {
out <- &bucket.Bucket{Id: id}
} else {
- s.putback(partition, id)
+ if err := s.putback(id); err != nil {
+ fmt.Printf("putback-error=%s\n", err)
+ }
}
}
}(retBuckets)
return retBuckets, nil
}
-func (s *RedisStore) putback(partition string, id *bucket.Id) error {
+func (s *RedisStore) putback(id *bucket.Id) error {
defer utils.MeasureT("bucket.putback", time.Now())
rc := s.redisPool.Get()
defer rc.Close()
- _, err := rc.Do("SADD", partition, id.String())
- return err
+ key := id.String()
+ partition := s.bucketPartition([]byte(key))
+ rc.Send("MULTI")
+ rc.Send("SADD", partition, key)
+ rc.Send("EXPIRE", partition, 300)
+ _, err := rc.Do("EXEC")
+ if err != nil {
+ return err
+ }
+ return nil
}
func (s *RedisStore) Put(b *bucket.Bucket) error {
@@ -117,7 +135,7 @@ func (s *RedisStore) Put(b *bucket.Bucket) error {
b.Unlock()
//TODO(ryandotsmith): Ensure consistent keys are being written.
- partition := s.bucketPartition("outlet", []byte(key))
+ partition := s.bucketPartition([]byte(key))
rc.Send("MULTI")
rc.Send("RPUSH", key, value)
rc.Send("EXPIRE", key, 300)
@@ -151,27 +169,27 @@ func (s *RedisStore) Get(b *bucket.Bucket) error {
return nil
}
-func (s *RedisStore) bucketPartition(prefix string, b []byte) string {
+func (s *RedisStore) bucketPartition(b []byte) string {
check := crc64.Checksum(b, PartitionTable)
- return fmt.Sprintf("%s.%d", prefix, check%s.MaxPartitions())
+ return fmt.Sprintf("%s.%d", partitionPrefix, check%s.MaxPartitions())
}
-func (s *RedisStore) lockPartition() (string, error) {
+func (s *RedisStore) lockPartition() (uint64, error) {
for {
for p := uint64(0); p < s.MaxPartitions(); p++ {
- name := fmt.Sprintf("lock.%d", p)
+ name := fmt.Sprintf("%s.%d", lockPrefix, p)
//TODO(ryandotsmith): remove magic number.
locked, err := s.writeLock(name, 5)
if err != nil {
- return "", err
+ return 0, err
}
if locked {
- return name, nil
+ return p, nil
}
}
time.Sleep(time.Second * 5)
}
- return "", errors.New("LockPartition impossible broke the loop.")
+ return 0, errors.New("LockPartition impossible broke the loop.")
}
func (s *RedisStore) writeLock(name string, ttl uint64) (bool, error) {
@@ -189,9 +207,9 @@ func (s *RedisStore) writeLock(name string, ttl uint64) (bool, error) {
return new > int64(old), nil
}
-func (s *RedisStore) unlockPartition(p string) error {
+func (s *RedisStore) unlockPartition(p uint64) error {
rc := s.redisPool.Get()
defer rc.Close()
- _, err := rc.Do("DEL", p)
+ _, err := rc.Do("DEL", lockPrefix + "." + strconv.Itoa(int(p)))
return err
}

0 comments on commit 66f1237

Please sign in to comment.