Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LFU Cache Fixes #7479

Merged
merged 9 commits into from
Feb 12, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions dev.env
@@ -1,13 +1,13 @@
# No shebang line as this script is sourced from an external shell.

# Copyright 2019 The Vitess Authors.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -34,6 +34,6 @@ export PATH

# According to https://github.com/etcd-io/etcd/blob/a621d807f061e1dd635033a8d6bc261461429e27/Documentation/op-guide/supported-platform.md,
# currently, etcd is unstable on arm64, so ETCD_UNSUPPORTED_ARCH should be set.
if [ "$(uname -m)" == aarch64 ]; then
if [ "$(uname -m)" = aarch64 ]; then
export ETCD_UNSUPPORTED_ARCH=arm64
fi
2 changes: 1 addition & 1 deletion go/cache/cache.go
Expand Up @@ -76,5 +76,5 @@ type Config struct {
var DefaultConfig = &Config{
MaxEntries: 5000,
MaxMemoryUsage: 32 * 1024 * 1024,
LFU: false,
LFU: true,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
73 changes: 7 additions & 66 deletions go/cache/ristretto/bloom/bbloom.go
Expand Up @@ -21,9 +21,6 @@
package bloom

import (
"bytes"
"encoding/json"
"log"
"math"
"unsafe"
)
Expand All @@ -43,26 +40,16 @@ func getSize(ui64 uint64) (size uint64, exponent uint64) {
return size, exponent
}

func calcSizeByWrongPositives(numEntries, wrongs float64) (uint64, uint64) {
size := -1 * numEntries * math.Log(wrongs) / math.Pow(float64(0.69314718056), 2)
locs := math.Ceil(float64(0.69314718056) * size / numEntries)
return uint64(size), uint64(locs)
// NewBloomFilterWithErrorRate returns a new bloomfilter with optimal size for the given
// error rate
func NewBloomFilterWithErrorRate(numEntries uint64, wrongs float64) *Bloom {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

size := -1 * float64(numEntries) * math.Log(wrongs) / math.Pow(0.69314718056, 2)
locs := math.Ceil(0.69314718056 * size / float64(numEntries))
return NewBloomFilter(uint64(size), uint64(locs))
}

// NewBloomFilter returns a new bloomfilter.
func NewBloomFilter(params ...float64) (bloomfilter *Bloom) {
var entries, locs uint64
if len(params) == 2 {
if params[1] < 1 {
entries, locs = calcSizeByWrongPositives(params[0], params[1])
} else {
entries, locs = uint64(params[0]), uint64(params[1])
}
} else {
log.Fatal("usage: New(float64(number_of_entries), float64(number_of_hashlocations))" +
" i.e. New(float64(1000), float64(3)) or New(float64(number_of_entries)," +
" float64(number_of_hashlocations)) i.e. New(float64(1000), float64(0.03))")
}
func NewBloomFilter(entries, locs uint64) (bloomfilter *Bloom) {
size, exponent := getSize(entries)
bloomfilter = &Bloom{
sizeExp: exponent,
Expand Down Expand Up @@ -162,49 +149,3 @@ func (bl *Bloom) IsSet(idx uint64) bool {
r := ((*(*uint8)(ptr)) >> (idx % 8)) & 1
return r == 1
}

// bloomJSONImExport
// Im/Export structure used by JSONMarshal / JSONUnmarshal
type bloomJSONImExport struct {
FilterSet []byte
SetLocs uint64
}

// NewWithBoolset takes a []byte slice and number of locs per entry,
// returns the bloomfilter with a bitset populated according to the input []byte.
func newWithBoolset(bs *[]byte, locs uint64) *Bloom {
bloomfilter := NewBloomFilter(float64(len(*bs)<<3), float64(locs))
for i, b := range *bs {
*(*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(&bloomfilter.bitset[0])) + uintptr(i))) = b
}
return bloomfilter
}

// JSONUnmarshal takes JSON-Object (type bloomJSONImExport) as []bytes
// returns bloom32 / bloom64 object.
func JSONUnmarshal(dbData []byte) (*Bloom, error) {
bloomImEx := bloomJSONImExport{}
if err := json.Unmarshal(dbData, &bloomImEx); err != nil {
return nil, err
}
buf := bytes.NewBuffer(bloomImEx.FilterSet)
bs := buf.Bytes()
bf := newWithBoolset(&bs, bloomImEx.SetLocs)
return bf, nil
}

// JSONMarshal returns JSON-object (type bloomJSONImExport) as []byte.
func (bl Bloom) JSONMarshal() []byte {
bloomImEx := bloomJSONImExport{}
bloomImEx.SetLocs = bl.setLocs
bloomImEx.FilterSet = make([]byte, len(bl.bitset)<<3)
for i := range bloomImEx.FilterSet {
bloomImEx.FilterSet[i] = *(*byte)(unsafe.Pointer(uintptr(unsafe.Pointer(&bl.bitset[0])) +
uintptr(i)))
}
data, err := json.Marshal(bloomImEx)
if err != nil {
log.Fatal("json.Marshal failed: ", err)
}
return data
}
41 changes: 5 additions & 36 deletions go/cache/ristretto/bloom/bbloom_test.go
Expand Up @@ -6,14 +6,12 @@ import (
"os"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/hack"
)

var (
wordlist1 [][]byte
n = 1 << 16
n = uint64(1 << 16)
bf *Bloom
)

Expand All @@ -31,7 +29,7 @@ func TestMain(m *testing.M) {
}

func TestM_NumberOfWrongs(t *testing.T) {
bf = NewBloomFilter(float64(n*10), float64(7))
bf = NewBloomFilter(n*10, 7)

cnt := 0
for i := range wordlist1 {
Expand All @@ -44,43 +42,14 @@ func TestM_NumberOfWrongs(t *testing.T) {

}

func TestM_JSON(t *testing.T) {
const shallBe = int(1 << 16)

bf = NewBloomFilter(float64(n*10), float64(7))

cnt := 0
for i := range wordlist1 {
hash := hack.RuntimeMemhash(wordlist1[i], 0)
if !bf.AddIfNotHas(hash) {
cnt++
}
}

jsonm := bf.JSONMarshal()

// create new bloomfilter from bloomfilter's JSON representation
bf2, err := JSONUnmarshal(jsonm)
require.NoError(t, err)

cnt2 := 0
for i := range wordlist1 {
hash := hack.RuntimeMemhash(wordlist1[i], 0)
if !bf2.AddIfNotHas(hash) {
cnt2++
}
}
require.Equal(t, shallBe, cnt2)
}

func BenchmarkM_New(b *testing.B) {
for r := 0; r < b.N; r++ {
_ = NewBloomFilter(float64(n*10), float64(7))
_ = NewBloomFilter(n*10, 7)
}
}

func BenchmarkM_Clear(b *testing.B) {
bf = NewBloomFilter(float64(n*10), float64(7))
bf = NewBloomFilter(n*10, 7)
for i := range wordlist1 {
hash := hack.RuntimeMemhash(wordlist1[i], 0)
bf.Add(hash)
Expand All @@ -92,7 +61,7 @@ func BenchmarkM_Clear(b *testing.B) {
}

func BenchmarkM_Add(b *testing.B) {
bf = NewBloomFilter(float64(n*10), float64(7))
bf = NewBloomFilter(n*10, 7)
b.ResetTimer()
for r := 0; r < b.N; r++ {
for i := range wordlist1 {
Expand Down
5 changes: 3 additions & 2 deletions go/cache/ristretto/cache.go
Expand Up @@ -45,7 +45,8 @@ func defaultStringHash(key string) (uint64, uint64) {

type itemCallback func(*Item)

const itemSize = int64(unsafe.Sizeof(storeItem{}))
// CacheItemSize is the overhead in bytes for every stored cache item
const CacheItemSize = int64(unsafe.Sizeof(storeItem{}))

// Cache is a thread-safe implementation of a hashmap with a TinyLFU admission
// policy and a Sampled LFU eviction policy. You can use the same Cache instance
Expand Down Expand Up @@ -446,7 +447,7 @@ func (c *Cache) processItems() {
}
if !c.ignoreInternalCost {
// Add the cost of internally storing the object.
i.Cost += itemSize
i.Cost += CacheItemSize
}

switch i.flag {
Expand Down
2 changes: 1 addition & 1 deletion go/cache/ristretto/policy.go
Expand Up @@ -375,7 +375,7 @@ type tinyLFU struct {
func newTinyLFU(numCounters int64) *tinyLFU {
return &tinyLFU{
freq: newCmSketch(numCounters),
door: bloom.NewBloomFilter(float64(numCounters), 0.01),
door: bloom.NewBloomFilterWithErrorRate(uint64(numCounters), 0.01),
resetAt: numCounters,
}
}
Expand Down
6 changes: 3 additions & 3 deletions go/mysql/fakesqldb/server.go
Expand Up @@ -50,8 +50,8 @@ const appendEntry = -1
type DB struct {
// Fields set at construction time.

// t is our testing.T instance
t *testing.T
// t is our testing.TB instance
t testing.TB

// listener is our mysql.Listener.
listener *mysql.Listener
Expand Down Expand Up @@ -151,7 +151,7 @@ type ExpectedExecuteFetch struct {
}

// New creates a server, and starts listening.
func New(t *testing.T) *DB {
func New(t testing.TB) *DB {
// Pick a path for our socket.
socketDir, err := ioutil.TempDir("", "fakesqldb")
if err != nil {
Expand Down
15 changes: 8 additions & 7 deletions go/vt/vttablet/endtoend/config_test.go
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/cache"
"vitess.io/vitess/go/cache/ristretto"
"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -178,9 +179,8 @@ func TestConsolidatorReplicasOnly(t *testing.T) {

func TestQueryPlanCache(t *testing.T) {
if cache.DefaultConfig.LFU {
const cacheItemSize = 40
const cachedPlanSize = 2275 + cacheItemSize
const cachePlanSize2 = 2254 + cacheItemSize
const cachedPlanSize = 2352 + int(ristretto.CacheItemSize)
const cachePlanSize2 = 2326 + int(ristretto.CacheItemSize)
testQueryPlanCache(t, cachedPlanSize, cachePlanSize2)
} else {
testQueryPlanCache(t, 1, 1)
Expand All @@ -203,22 +203,23 @@ func testQueryPlanCache(t *testing.T, cachedPlanSize, cachePlanSize2 int) {
client := framework.NewClient()
_, _ = client.Execute("select * from vitess_test where intval=:ival1", bindVars)
_, _ = client.Execute("select * from vitess_test where intval=:ival2", bindVars)
time.Sleep(100 * time.Millisecond)
assert.Equal(t, 1, framework.Server.QueryPlanCacheLen())

vend := framework.DebugVars()
verifyIntValue(t, vend, "QueryCacheLength", 1)
// verifyIntValue(t, vend, "QueryCacheLength", 1)
vmg marked this conversation as resolved.
Show resolved Hide resolved
verifyIntValue(t, vend, "QueryCacheSize", cachedPlanSize)
verifyIntValue(t, vend, "QueryCacheCapacity", cachedPlanSize)

framework.Server.SetQueryPlanCacheCap(64 * 1024)
_, _ = client.Execute("select * from vitess_test where intval=:ival1", bindVars)
time.Sleep(100 * time.Millisecond)
require.Equal(t, 2, framework.Server.QueryPlanCacheLen())

vend = framework.DebugVars()
verifyIntValue(t, vend, "QueryCacheLength", 2)
verifyIntValue(t, vend, "QueryCacheSize", cachedPlanSize*2)

_, _ = client.Execute("select * from vitess_test where intval=1", bindVars)
time.Sleep(100 * time.Millisecond)
require.Equal(t, 3, framework.Server.QueryPlanCacheLen())

vend = framework.DebugVars()
verifyIntValue(t, vend, "QueryCacheLength", 3)
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/endtoend/queries_test.go
Expand Up @@ -18,6 +18,7 @@ package endtoend

import (
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -1783,6 +1784,10 @@ func TestQueries(t *testing.T) {
},
},
}

// Wait for the vtgate caches to flush
time.Sleep(1 * time.Second)

for _, tcase := range testCases {
if err := tcase.Test("", client); err != nil {
t.Error(err)
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletserver/query_engine.go
Expand Up @@ -413,6 +413,12 @@ func (qe *QueryEngine) QueryPlanCacheCap() int {
return int(qe.plans.MaxCapacity())
}

// QueryPlanCacheLen returns the length (size in entries) of the query cache
func (qe *QueryEngine) QueryPlanCacheLen() int {
qe.plans.Wait()
return qe.plans.Len()
}

// AddStats adds the given stats for the planName.tableName
func (qe *QueryEngine) AddStats(planName, tableName string, queryCount int64, duration, mysqlTime time.Duration, rowCount, errorCount int64) {
// table names can contain "." characters, replace them!
Expand Down