New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor, add plugable cache framework #740

Merged
merged 3 commits into from Sep 7, 2017
Jump to file or symbol
Failed to load files and symbols.
+126 −31
Diff settings

Always

Just for now

View
@@ -346,15 +346,15 @@ type clusterInfo struct {
regions *regionsInfo
activeRegions int
writeStatistics *cache.LRU
writeStatistics cache.Cache
}
func newClusterInfo(id IDAllocator) *clusterInfo {
return &clusterInfo{
id: id,
stores: newStoresInfo(),
regions: newRegionsInfo(),
writeStatistics: cache.NewLRU(writeStatLRUMaxLen),
writeStatistics: cache.NewDefaultCache(writeStatCacheMaxLen),
}
}
@@ -786,10 +786,10 @@ func (c *clusterInfo) updateWriteStatus(region *core.RegionInfo) {
region.WrittenBytes = WrittenBytesPerSec
// hotRegionThreshold is use to pick hot region
// suppose the number of the hot regions is writeStatLRUMaxLen
// suppose the number of the hot regions is writeStatCacheMaxLen
// and we use total written Bytes past storeHeartBeatReportInterval seconds to divide the number of hot regions
// divide 2 because the store reports data about two times than the region record write to rocksdb
divisor := float64(writeStatLRUMaxLen) * 2 * storeHeartBeatReportInterval
divisor := float64(writeStatCacheMaxLen) * 2 * storeHeartBeatReportInterval
hotRegionThreshold := uint64(float64(c.stores.totalWrittenBytes()) / divisor)
if hotRegionThreshold < hotRegionMinWriteRate {
View
@@ -0,0 +1,115 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package cache
import (
"sync"
)
// Cache is an interface for cache system
type Cache interface {
// Put puts an item into cache.
Put(key uint64, value interface{})
// Get retrives an item from cache.
Get(key uint64) (interface{}, bool)
// Peek reads an item from cache. The action is no considered 'Use'.
Peek(key uint64) (interface{}, bool)
// Remove eliminates an item from cache.
Remove(key uint64)
// Elems return all items in cache.
Elems() []*Item
// Len returns current cache size
Len() int
}
// Type is cache's type such as LRUCache and etc.
type Type int
const (
// LRUCache is LRU cache type
LRUCache Type = 1
)
var (
// DefaultCacheType set default cache type for NewDefaultCache function
DefaultCacheType = LRUCache
)
type threadSafeCache struct {
cache Cache
lock sync.RWMutex
}
func newThreadSafeCache(cache Cache) Cache {
return &threadSafeCache{
cache: cache,
}
}
// Put puts an item into cache.
func (c *threadSafeCache) Put(key uint64, value interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
c.cache.Put(key, value)
}
// Get retrives an item from cache.
func (c *threadSafeCache) Get(key uint64) (interface{}, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
return c.cache.Get(key)
}
// Peek reads an item from cache. The action is no considered 'Use'.
func (c *threadSafeCache) Peek(key uint64) (interface{}, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
return c.cache.Peek(key)
}
// Remove eliminates an item from cache.
func (c *threadSafeCache) Remove(key uint64) {
c.lock.Lock()
defer c.lock.Unlock()
c.cache.Remove(key)
}
// Elems return all items in cache.
func (c *threadSafeCache) Elems() []*Item {
c.lock.RLock()
defer c.lock.RUnlock()
return c.cache.Elems()
}
// Len returns current cache size
func (c *threadSafeCache) Len() int {
c.lock.RLock()
defer c.lock.RUnlock()
return c.cache.Len()
}
// NewCache create Cache instance by CacheType
func NewCache(size int, cacheType Type) Cache {
switch cacheType {
case LRUCache:
return newThreadSafeCache(newLRU(size))
default:
panic("Unknown cache type")
}
}
// NewDefaultCache create Cache instance by default cache type
func NewDefaultCache(size int) Cache {
return NewCache(size, DefaultCacheType)
}
@@ -79,7 +79,7 @@ func (s *testRegionCacheSuite) TestExpireRegionCache(c *C) {
}
func (s *testRegionCacheSuite) TestLRUCache(c *C) {
cache := NewLRU(3)
cache := newLRU(3)
cache.Put(1, "1")
cache.Put(2, "2")
cache.Put(3, "3")
View
@@ -15,7 +15,6 @@ package cache
import (
"container/list"
"sync"
)
// Item is the cache entry.
@@ -26,8 +25,6 @@ type Item struct {
// LRU is 'Least-Recently-Used' cache.
type LRU struct {
sync.RWMutex
// maxCount is the maximum number of items.
// 0 means no limit.
maxCount int
@@ -36,8 +33,9 @@ type LRU struct {
cache map[uint64]*list.Element
}
// NewLRU returns a new lru cache.
func NewLRU(maxCount int) *LRU {
// newLRU returns a new lru cache. And this LRU cache is not thread-safe
// should not use this function to create LRU cache, use NewCache instead
func newLRU(maxCount int) *LRU {
return &LRU{
maxCount: maxCount,
ll: list.New(),
@@ -47,9 +45,6 @@ func NewLRU(maxCount int) *LRU {
// Put puts an item into cache.
func (c *LRU) Put(key uint64, value interface{}) {
c.Lock()
defer c.Unlock()
if ele, ok := c.cache[key]; ok {
c.ll.MoveToFront(ele)
ele.Value.(*Item).Value = value
@@ -66,9 +61,6 @@ func (c *LRU) Put(key uint64, value interface{}) {
// Get retrives an item from cache.
func (c *LRU) Get(key uint64) (interface{}, bool) {
c.Lock()
defer c.Unlock()
if ele, ok := c.cache[key]; ok {
c.ll.MoveToFront(ele)
return ele.Value.(*Item).Value, true
@@ -79,9 +71,6 @@ func (c *LRU) Get(key uint64) (interface{}, bool) {
// Peek reads an item from cache. The action is no considerd 'Use'.
func (c *LRU) Peek(key uint64) (interface{}, bool) {
c.RLock()
defer c.RUnlock()
if ele, ok := c.cache[key]; ok {
return ele.Value.(*Item).Value, true
}
@@ -91,9 +80,6 @@ func (c *LRU) Peek(key uint64) (interface{}, bool) {
// Remove eliminates an item from cache.
func (c *LRU) Remove(key uint64) {
c.Lock()
defer c.Unlock()
if ele, ok := c.cache[key]; ok {
c.removeElement(ele)
}
@@ -114,9 +100,6 @@ func (c *LRU) removeElement(ele *list.Element) {
// Elems return all items in cache.
func (c *LRU) Elems() []*Item {
c.RLock()
defer c.RUnlock()
elems := make([]*Item, 0, c.ll.Len())
for ele := c.ll.Front(); ele != nil; ele = ele.Next() {
clone := *(ele.Value.(*Item))
@@ -128,8 +111,5 @@ func (c *LRU) Elems() []*Item {
// Len returns current cache size.
func (c *LRU) Len() int {
c.RLock()
defer c.RUnlock()
return c.ll.Len()
}
View
@@ -39,7 +39,7 @@ const (
minSlowScheduleInterval = time.Second * 3
scheduleIntervalFactor = 1.3
writeStatLRUMaxLen = 1000
writeStatCacheMaxLen = 1000
hotRegionMinWriteRate = 16 * 1024
regionHeartBeatReportInterval = 60
regionheartbeatSendChanCap = 1024
@@ -68,7 +68,7 @@ type coordinator struct {
checker *schedule.ReplicaChecker
operators map[uint64]*schedule.Operator
schedulers map[string]*scheduleController
histories *cache.LRU
histories cache.Cache
hbStreams *heartbeatStreams
}
@@ -83,7 +83,7 @@ func newCoordinator(cluster *clusterInfo, opt *scheduleOption, hbStreams *heartb
checker: schedule.NewReplicaChecker(opt, cluster),
operators: make(map[uint64]*schedule.Operator),
schedulers: make(map[string]*scheduleController),
histories: cache.NewLRU(historiesCacheSize),
histories: cache.NewDefaultCache(historiesCacheSize),
hbStreams: hbStreams,
}
}
ProTip! Use n and p to navigate between commits in a pull request.